From 5692c48ab7103a2051f351e08fd012fc9022d951 Mon Sep 17 00:00:00 2001 From: ReinUsesLisp Date: Tue, 21 Jul 2020 04:26:20 -0300 Subject: [PATCH] service/sockets: Add worker abstraction to execute blocking calls asynchronously This abstraction allows executing blocking functions (like recvfrom on a socket configured for blocking) without blocking the service thread. It is intended to be used with SleepClientThread. --- src/core/CMakeLists.txt | 1 + .../hle/service/sockets/blocking_worker.h | 132 ++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 src/core/hle/service/sockets/blocking_worker.h diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 48578ad48..b96ca9374 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -491,6 +491,7 @@ add_library(core STATIC hle/service/sm/controller.h hle/service/sm/sm.cpp hle/service/sm/sm.h + hle/service/sockets/blocking_worker.h hle/service/sockets/bsd.cpp hle/service/sockets/bsd.h hle/service/sockets/ethc.cpp diff --git a/src/core/hle/service/sockets/blocking_worker.h b/src/core/hle/service/sockets/blocking_worker.h new file mode 100644 index 000000000..7bd486530 --- /dev/null +++ b/src/core/hle/service/sockets/blocking_worker.h @@ -0,0 +1,132 @@ +// Copyright 2020 yuzu emulator team +// Licensed under GPLv2 or any later version +// Refer to the license.txt file included. + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +#include "common/assert.h" +#include "common/microprofile.h" +#include "common/thread.h" +#include "core/core.h" +#include "core/hle/kernel/hle_ipc.h" +#include "core/hle/kernel/kernel.h" +#include "core/hle/kernel/thread.h" +#include "core/hle/kernel/writable_event.h" + +namespace Service::Sockets { + +/** + * Worker abstraction to execute blocking calls on host without blocking the guest thread + * + * @tparam Service Service where the work is executed + * @tparam ...Types Types of work to execute + */ +template +class BlockingWorker { + using This = BlockingWorker; + using WorkVariant = std::variant; + +public: + /// Create a new worker + static std::unique_ptr Create(Core::System& system, Service* service, + std::string_view name) { + return std::unique_ptr(new This(system, service, name)); + } + + ~BlockingWorker() { + while (!is_available.load(std::memory_order_relaxed)) { + // Busy wait until work is finished + std::this_thread::yield(); + } + // Monostate means to exit the thread + work = std::monostate{}; + work_event.Set(); + thread.join(); + } + + /** + * Try to capture the worker to send work after a success + * @returns True when the worker has been successfully captured + */ + bool TryCapture() { + bool expected = true; + return is_available.compare_exchange_weak(expected, false, std::memory_order_relaxed, + std::memory_order_relaxed); + } + + /** + * Send work to this worker abstraction + * @see TryCapture must be called before attempting to call this function + */ + template + void SendWork(Work new_work) { + ASSERT_MSG(!is_available, "Trying to send work on a worker that's not captured"); + work = std::move(new_work); + work_event.Set(); + } + + /// Generate a callback for @see SleepClientThread + template + auto Callback() { + return [this](std::shared_ptr, Kernel::HLERequestContext& ctx, + Kernel::ThreadWakeupReason reason) { + ASSERT(reason == Kernel::ThreadWakeupReason::Signal); + std::get(work).Response(ctx); + is_available.store(true); + }; + } + + /// Get kernel event that will be signalled by the worker when the host operation finishes + std::shared_ptr KernelEvent() const { + return kernel_event; + } + +private: + explicit BlockingWorker(Core::System& system, Service* service, std::string_view name) { + auto pair = Kernel::WritableEvent::CreateEventPair(system.Kernel(), std::string(name)); + kernel_event = std::move(pair.writable); + thread = std::thread([this, &system, service, name] { Run(system, service, name); }); + } + + void Run(Core::System& system, Service* service, std::string_view name) { + system.RegisterHostThread(); + + const std::string thread_name = fmt::format("yuzu:{}", name); + MicroProfileOnThreadCreate(thread_name.c_str()); + Common::SetCurrentThreadName(thread_name.c_str()); + + bool keep_running = true; + while (keep_running) { + work_event.Wait(); + + const auto visit_fn = [service, &keep_running](auto&& w) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + keep_running = false; + } else { + w.Execute(service); + } + }; + std::visit(visit_fn, work); + + kernel_event->Signal(); + } + } + + std::thread thread; + WorkVariant work; + Common::Event work_event; + std::shared_ptr kernel_event; + std::atomic_bool is_available{true}; +}; + +} // namespace Service::Sockets