Precommit (#1)

* first commit

* cleanup
This commit is contained in:
tompzf
2025-11-04 13:28:06 +01:00
committed by GitHub
parent dba45dc636
commit 6ed4b1534e
898 changed files with 256340 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
# Define project
project(ipc_shared_mem VERSION 1.0 LANGUAGES CXX)
# Define target
add_library(ipc_shared_mem SHARED
"channel_mgnt.h"
"channel_mgnt.cpp"
"connection.h"
"connection.cpp"
"shared_mem_buffer_posix.h"
"shared_mem_buffer_windows.h"
"in_process_mem_buffer.h"
"mem_buffer_accessor.h"
"mem_buffer_accessor.cpp"
"watchdog.h"
"watchdog.cpp"
)
if(UNIX)
target_link_libraries(ipc_shared_mem rt ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT})
else()
target_link_libraries(ipc_shared_mem ${CMAKE_THREAD_LIBS_INIT})
endif()
target_link_options(ipc_shared_mem PRIVATE)
target_include_directories(ipc_shared_mem PRIVATE ./include/)
set_target_properties(ipc_shared_mem PROPERTIES PREFIX "")
set_target_properties(ipc_shared_mem PROPERTIES SUFFIX ".sdv")
# Build dependencies
add_dependencies(ipc_shared_mem CompileCoreIDL)
# Appending the service in the service list
set(SDV_Service_List ${SDV_Service_List} ipc_shared_mem PARENT_SCOPE)

View File

@@ -0,0 +1,97 @@
#include "channel_mgnt.h"
#include "connection.h"
#include <support/toml.h>
void CSharedMemChannelMgnt::Initialize(const sdv::u8string& /*ssObjectConfig*/)
{
if (m_eObjectStatus != sdv::EObjectStatus::initialization_pending)
{
m_eObjectStatus = sdv::EObjectStatus::initialization_failure;
return;
}
m_eObjectStatus = sdv::EObjectStatus::initialized;
}
sdv::EObjectStatus CSharedMemChannelMgnt::GetStatus() const
{
return m_eObjectStatus;
}
void CSharedMemChannelMgnt::SetOperationMode(sdv::EOperationMode eMode)
{
switch (eMode)
{
case sdv::EOperationMode::configuring:
if (m_eObjectStatus == sdv::EObjectStatus::running || m_eObjectStatus == sdv::EObjectStatus::initialized)
m_eObjectStatus = sdv::EObjectStatus::configuring;
break;
case sdv::EOperationMode::running:
if (m_eObjectStatus == sdv::EObjectStatus::configuring || m_eObjectStatus == sdv::EObjectStatus::initialized)
m_eObjectStatus = sdv::EObjectStatus::running;
break;
default:
break;
}
}
void CSharedMemChannelMgnt::Shutdown()
{
m_eObjectStatus = sdv::EObjectStatus::shutdown_in_progress;
m_watchdog.Clear();
m_eObjectStatus = sdv::EObjectStatus::destruction_pending;
}
sdv::ipc::SChannelEndpoint CSharedMemChannelMgnt::CreateEndpoint(/*in*/ const sdv::u8string& ssEndpointConfig)
{
std::string ssName;
uint32_t uiSize = 10*1024;
if (!ssEndpointConfig.empty())
{
sdv::toml::CTOMLParser config(ssEndpointConfig);
ssName = static_cast<std::string>(config.GetDirect("IpcChannel.Name").GetValue());
if (!ssName.empty())
{
uiSize = config.GetDirect("IpcChannel.Size").GetValue();
if (!uiSize) uiSize = 128 * 1024;
}
}
// Create a connection
std::shared_ptr<CConnection> ptrConnection = std::make_shared<CConnection>(m_watchdog, uiSize, ssName, true);
// Ignore cppcheck warning; normally the returned pointer should always have a value at this stage (otherwise an
// exception was triggered).
// cppcheck-suppress knownConditionTrueFalse
if (!ptrConnection)
return {};
m_watchdog.AddConnection(ptrConnection);
// Return the connection details.
sdv::ipc::SChannelEndpoint connectionEndpoint{};
connectionEndpoint.pConnection = static_cast<IInterfaceAccess*>(ptrConnection.get());
connectionEndpoint.ssConnectString = ptrConnection->GetConnectionString();
return connectionEndpoint;
}
sdv::IInterfaceAccess* CSharedMemChannelMgnt::Access(const sdv::u8string& ssConnectString)
{
sdv::toml::CTOMLParser parser(ssConnectString);
if (!parser.IsValid()) return nullptr;
// Is this a configuration provided by the endpoint (uses a "Provider" key), then this is a connection string. Use this
// to connect to the shared memory.
std::shared_ptr<CConnection> ptrConnection;
if (parser.GetDirect("Provider").IsValid())
ptrConnection = std::make_shared<CConnection>(m_watchdog, ssConnectString.c_str());
else
{
std::string ssName = static_cast<std::string>(parser.GetDirect("IpcChannel.Name").GetValue());
ptrConnection = std::make_shared<CConnection>(m_watchdog, 0,ssName, false);
}
if (!ptrConnection) return {};
m_watchdog.AddConnection(ptrConnection);
// Return the connection
IInterfaceAccess* pInterface = ptrConnection.get();
return pInterface;
}

View File

@@ -0,0 +1,120 @@
#ifndef CHANNEL_MGNT_H
#define CHANNEL_MGNT_H
#include <support/component_impl.h>
#include <interfaces/ipc.h>
#include "connection.h"
#include "shared_mem_buffer_posix.h"
#include "shared_mem_buffer_windows.h"
#include "watchdog.h"
#include "connection.h"
#define TEST_DECLARE_OBJECT_CLASS_ALIAS(...) \
static sdv::sequence<sdv::u8string> GetClassAliasesStaticMyTest() \
{ \
return sdv::sequence<sdv::u8string>({__VA_ARGS__}); \
}
/**
* @brief IPC channel management class for the shared memory communication.
*/
class CSharedMemChannelMgnt : public sdv::CSdvObject, public sdv::IObjectControl, public sdv::ipc::ICreateEndpoint,
public sdv::ipc::IChannelAccess
{
public:
// Interface map
BEGIN_SDV_INTERFACE_MAP()
SDV_INTERFACE_ENTRY(sdv::IObjectControl)
SDV_INTERFACE_ENTRY(sdv::ipc::IChannelAccess)
SDV_INTERFACE_ENTRY(sdv::ipc::ICreateEndpoint)
END_SDV_INTERFACE_MAP()
// Object declarations
DECLARE_OBJECT_CLASS_TYPE(sdv::EObjectType::SystemObject)
DECLARE_OBJECT_CLASS_NAME("DefaultSharedMemoryChannelControl")
DECLARE_OBJECT_CLASS_ALIAS("LocalChannelControl")
DECLARE_DEFAULT_OBJECT_NAME("LocalChannelControl")
DECLARE_OBJECT_SINGLETON()
/**
* @brief Initialize the object. Overload of sdv::IObjectControl::Initialize.
* @param[in] ssObjectConfig Optional configuration string.
*/
void Initialize(const sdv::u8string& ssObjectConfig) override;
/**
* @brief Get the current status of the object. Overload of sdv::IObjectControl::GetStatus.
* @return Return the current status of the object.
*/
sdv::EObjectStatus GetStatus() const override;
/**
* @brief Set the component operation mode. Overload of sdv::IObjectControl::SetOperationMode.
* @param[in] eMode The operation mode, the component should run in.
*/
void SetOperationMode(sdv::EOperationMode eMode) override;
/**
* @brief Shutdown called before the object is destroyed. Overload of sdv::IObjectControl::Shutdown.
*/
void Shutdown() override;
/**
* @brief Create IPC connection object and return the endpoint information. Overload of
* sdv::ipc::ICreateEndpoint::CreateEndpoint.
* @details The endpoints are generated using either a size and a name based on the provided channel configuration or if no
* configuration is supplied a default size of 10k and a randomly generated name. The following configuration
* can be supplied:
* @code
* [IpcChannel]
* Name = "CHANNEL_1234"
* Size = 10240
* @endcode
* @param[in] ssChannelConfig Optional channel type specific endpoint configuration.
* @return IPC connection object
*/
sdv::ipc::SChannelEndpoint CreateEndpoint(/*in*/ const sdv::u8string& ssChannelConfig) override;
/**
* @brief Create a connection object from the channel connection parameters string
* @param[in] ssConnectString Reference to the string containing the channel connection parameters.
* @return Pointer to IInterfaceAccess interface of the connection object or NULL when the object cannot be created.
*/
sdv::IInterfaceAccess* Access(const sdv::u8string& ssConnectString) override;
private:
/**
* @brief Shared memory bridge
* @attention The bridge is created here, but potentially used by two separated processed. To prevent channel destruction
* keep the bridge alive.
* @attention Under Posix, the unmapping in one process counts for all connections to this buffer within the process. Creating
* additional buffer access interfaces might result in the buffer becoming invalid when one of them is removed again.
*/
struct SChannel
{
/**
* @brief Constructor
*/
SChannel() : bufferTargetTx(bufferOriginRx.GetConnectionString()), bufferTargetRx(bufferOriginTx.GetConnectionString())
{
if (!bufferOriginRx.IsValid()) std::cout << "Channel Origin RX is invalid!" << std::endl;
if (!bufferOriginTx.IsValid()) std::cout << "Channel Origin TX is invalid!" << std::endl;
if (!bufferTargetRx.IsValid()) std::cout << "Channel Target RX is invalid!" << std::endl;
if (!bufferTargetTx.IsValid()) std::cout << "Channel Target TX is invalid!" << std::endl;
}
CSharedMemBufferTx bufferOriginTx; ///< Origin Tx channel
CSharedMemBufferRx bufferOriginRx; ///< Origin Rx channel
CSharedMemBufferTx bufferTargetTx; ///< Target Tx channel
CSharedMemBufferRx bufferTargetRx; ///< Target Rx channel
};
sdv::EObjectStatus m_eObjectStatus = sdv::EObjectStatus::initialization_pending; ///< Object status.
std::map<std::string, std::unique_ptr<SChannel>> m_mapChannels; ///< Map with channels.
CWatchDog m_watchdog; ///< Process monitor for connections.
};
DEFINE_SDV_OBJECT(CSharedMemChannelMgnt)
#endif // ! defined CHANNEL_MGNT_H

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,453 @@
/**
* @file connection.h
* @author Erik Verhoeven FRD DISDS1 (mailto:erik.verhoeven@zf.com)
* @brief Implementation of connection class.
* @version 2.0
* @date 2024-06-24
*
* @copyright Copyright ZF Friedrichshafen AG (c) 2023-2025
*
*/
#ifndef CHANNEL_H
#define CHANNEL_H
/// Enables the reporting of messages when set to...
/// 1. info only (no protocol, no data)
/// 2. info and protocol (no data)
/// 3. info, protocol and data protocol
/// 4. info, protocol, data protocol and data content
#define ENABLE_REPORTING 0
/// When put to 1, decoupling of receive data is activated (default is not activated).
#define ENABLE_DECOUPLING 0
#if ENABLE_REPORTING > 0
/// Enable tracing
#define ENABLE_TRACE 1
#endif
#include <thread>
#include <algorithm>
#include "in_process_mem_buffer.h"
#include "shared_mem_buffer_posix.h"
#include "shared_mem_buffer_windows.h"
#include <interfaces/ipc.h>
#include <interfaces/process.h>
#include <support/interface_ptr.h>
#include <support/local_service_access.h>
#include <support/component_impl.h>
#include <queue>
#include <list>
#include "../../global/trace.h"
#ifdef _MSC_VER
#pragma comment(lib, "Ws2_32.lib")
#endif
// Forward declaration
class CWatchDog;
/**
* Class for local IPC connection
* Created and managed by IPCAccess::AccessLocalIPCConnection(best use unique_ptr to store, so memory address stays
* valid)
*/
class CConnection : public std::enable_shared_from_this<CConnection>, public sdv::IInterfaceAccess, public sdv::IObjectDestroy,
public sdv::ipc::IDataSend, public sdv::ipc::IConnect
{
public:
/**
* @brief default constructor used by create endpoint - allocates new buffers m_Sender and m_Receiver
* @param[in] rWatchDog Reference to the watch dog object monitoring the connected processes.
* @param[in] uiSize Optional size of the buffer. If zero, a default buffer size of 10k is configured.
* @param[in] rssName Optional name to be used for the connection. If empty, a random name is generated.
* @param[in] bServer When set, the connection is the server connection; otherwise it is the client connection (determines the
* initial communication).
*/
CConnection(CWatchDog& rWatchDog, uint32_t uiSize, const std::string& rssName, bool bServer);
/**
* @brief Access existing connection
* @param[in] rWatchDog Reference to the watch dog object monitoring the connected processes.
* @param[in] rssConnectionString Reference to string with connection information.
*/
CConnection(CWatchDog& rWatchDog, const std::string& rssConnectionString);
/**
* @brief Virtual destructor needed for "delete this;".
*/
virtual ~CConnection();
BEGIN_SDV_INTERFACE_MAP()
SDV_INTERFACE_ENTRY(sdv::ipc::IDataSend)
SDV_INTERFACE_ENTRY(sdv::ipc::IConnect)
SDV_INTERFACE_ENTRY(sdv::IObjectDestroy)
END_SDV_INTERFACE_MAP()
/**
* @brief get the connection string for the sender and the receiver
* @return Returns the connection string for the sender and the receiver together
*/
std::string GetConnectionString();
/**
* @brief Sends data consisting of multiple data chunks via the IPC connection.
* Overload of sdv::ipc::IDataSend::SendData.
* @param[inout] seqData Sequence of data buffers to be sent. The sequence might be changed to optimize the communication
* without having to copy the data.
* @return Return 'true' if all data could be sent; 'false' otherwise.
*/
virtual bool SendData(/*inout*/ sdv::sequence<sdv::pointer<uint8_t>>& seqData) override;
/**
* @brief Establish a connection and start sending/receiving messages. Overload of
* sdv::ipc::IConnect::AsyncConnect.
* @param[in] pReceiver The message has to be forwarded.
* @return Returns 'true' when a connection could be established. Use IConnectStatus or IConnectEventCallback to check the
* connection state.
*/
virtual bool AsyncConnect(/*in*/ sdv::IInterfaceAccess* pReceiver) override;
/**
* @brief Wait for a connection to take place. Overload of sdv::ipc::IConnect::WaitForConnection.
* @param[in] uiWaitMs Wait for a connection to take place. A value of 0 doesn't wait at all, a value of 0xffffffff
* waits for infinite time.
* @return Returns 'true' when a connection took place.
*/
virtual bool WaitForConnection(/*in*/ uint32_t uiWaitMs) override;
/**
* @brief Cancel a wait for connection. Overload of sdv::ipc::IConnect::CancelWait.
*/
virtual void CancelWait() override;
// Suppress cppcheck warning. The destructor calls Disconnect without dynamic binding. This is correct so.
// cppcheck-suppress virtualCallInConstructor
/**
* @brief Disconnect from a connection. This will set the connect status to disconnected. Overload of
* sdv::ipc::IConnect::Disconnect.
*/
virtual void Disconnect() override;
/**
* @brief Register event callback interface. Overload of sdv::ipc::IConnect::RegisterStatusEventCallback.
* @details Register a connection status event callback interface. The exposed interface must be of type
* IConnectEventCallback. The registration will exist until a call to the unregister function with the returned cookie
* or until the connection is terminated.
* @param[in] pEventCallback Pointer to the object exposing the IConnectEventCallback interface.
* @return The cookie assigned to the registration.
*/
virtual uint64_t RegisterStatusEventCallback(/*in*/ sdv::IInterfaceAccess* pEventCallback) override;
/**
* @brief Unregister the status event callback with the returned cookie from the registration. Overload of
* sdv::ipc::IConnect::UnregisterStatusEventCallback.
* @param[in] uiCookie The cookie returned by a previous call to the registration function.
*/
virtual void UnregisterStatusEventCallback(/*in*/ uint64_t uiCookie) override;
/**
* @brief Get status of the connection. Overload of sdv::ipc::IConnect::GetStatus.
* @return Returns the ipc::EConnectStatus struct
*/
virtual sdv::ipc::EConnectStatus GetStatus() const override;
/**
* @brief Destroy the object. Overload of IObjectDestroy::DestroyObject.
* @attention After a call of this function, all exposed interfaces render invalid and should not be used any more.
*/
virtual void DestroyObject() override;
/**
* @brief Set the connection status and if needed call the event callback.
* @param[in] eStatus The new status.
*/
void SetStatus(sdv::ipc::EConnectStatus eStatus);
/**
* @brief Returns whether this is a server connection or a client connection.
* @return The server connection flag. If 'true' the connection is a server connection; otherwise a client connection.
*/
bool IsServer() const;
#ifdef TIME_TRACKING
/**
* @brief Get the last fragment sent time. Used to detect gaps.
* @return The last sent time.
*/
std::chrono::high_resolution_clock::time_point GetLastSentTime() const { return m_tpLastSent; }
/**
* @brief Get the last fragment received time. Used to detect gaps.
* @return The last received time.
*/
std::chrono::high_resolution_clock::time_point GetLastReceiveTime() const { return m_tpLastReceived; }
/**
* @brief Get the last fragment received loop time. Used to detect gaps.
* @return The last received loop time.
*/
std::chrono::duration<double> GetLargestReceiveLoopDuration() const { return m_durationLargestDeltaReceived; }
#endif
private:
#if ENABLE_REPORTING > 0
template <typename... TArgs>
void Trace(TArgs... tArgs) const
{
return ::Trace("this=", static_cast<const void*>(this), " ", tArgs...);
}
#endif
/**
* @brief Message type enum
*/
enum class EMsgType : uint32_t
{
sync_request = 0, ///< Sync request message (version check; no data).
sync_answer = 1, ///< Sync answer message (version check; no data).
connect_request = 10, ///< Connection initiation request (SConnectMsg is used)
connect_answer = 11, ///< Connection answer request (SConnectMsg is used)
connect_term = 90, ///< Connection terminated
data = 0x10000000, ///< Data message
data_fragment = 0x10000001, ///< Data fragment (if data is longer than 1/4th of the buffer).
};
/**
* @brief Message header
*/
struct SMsgHdr
{
uint32_t uiVersion; ///< Header version
EMsgType eType; ///< Type of packet
};
/**
* @brief Connection initiation message
*/
struct SConnectMsg : SMsgHdr
{
sdv::process::TProcessID tProcessID; ///< Process ID needed for lifetime monitoring
};
/**
* @brief Fragmented data message header.
*/
struct SFragmentedMsgHdr : SMsgHdr
{
uint32_t uiTotalLength; ///< The total length the data has.
uint32_t uiOffset; ///< Current offset of the data.
};
/**
* @brief Event callback structure.
*/
struct SEventCallback
{
uint64_t uiCookie = 0; ///< Registration cookie
sdv::ipc::IConnectEventCallback* pCallback = nullptr; ///< Pointer to the callback. Could be NULL when the callback
///< was deleted.
};
CWatchDog& m_rWatchDog; ///< Reference to the watch dog object monitoring
///< the connected processes.
sdv::CLifetimeCookie m_cookie = sdv::CreateLifetimeCookie(); ///< Lifetime cookie to manage module lifetime.
CSharedMemBufferTx m_sender; ///< Shared buffer for sending.
CSharedMemBufferRx m_receiver; ///< Shared buffer for receiving.
std::thread m_threadReceive; ///< Thread which receives data from the socket.
std::atomic<sdv::ipc::EConnectStatus> m_eStatus = sdv::ipc::EConnectStatus::uninitialized; ///< the status of the connection
sdv::ipc::IDataReceiveCallback* m_pReceiver = nullptr; ///< Receiver to pass the messages to if available
std::shared_mutex m_mtxEventCallbacks; ///< Protect access to callback list. Only locking when
///< inserting.
std::list<SEventCallback> m_lstEventCallbacks; ///< List containing event callbacks. New callbacks will
///< be inserted in front (called first). Removed
///< callbacks are NULL; the entry stays to allow
///< removal during a SetStatus call.
mutable std::mutex m_mtxSend; ///< Synchronize all packages to be sent.
std::mutex m_mtxConnect; ///< Connection mutex.
std::condition_variable m_cvConnect; ///< Connection variable for connecting.
std::condition_variable m_cvStartConnect; ///< Start connection variable for connecting.
bool m_bStarted = false; ///< When set, the reception thread has started.
bool m_bServer = false; ///< When set, the connection is a server connection.
#if ENABLE_DECOUPLING > 0
std::mutex m_mtxReceive; ///< Protect receive queue.
std::queue<sdv::sequence<sdv::pointer<uint8_t>>> m_queueReceive; ///< Receive queue to decouple receiving and processing.
std::thread m_threadDecoupleReceive; ///< Decoupled receive thread.
std::condition_variable m_cvReceiveAvailable; ///< Condition variable synchronizing the processing.
std::condition_variable m_cvReceiveProcessed; ///< Condition variable synchronizing the processing.
#endif
#ifdef TIME_TRACKING
std::chrono::high_resolution_clock::time_point m_tpLastSent{}; ///< Last time a fragment was sent.
std::chrono::high_resolution_clock::time_point m_tpLastReceived{}; ///< Last time a fragment was received.
std::chrono::duration<double> m_durationLargestDeltaReceived; ///< Largest duration
#endif
/**
* @brief Raw send function.
* @param[in] pData to be send
* @param[in] uiDataLength size of the data to be sent
* @return Returns number of bytes which has been sent
*/
uint32_t Send(const void* pData, uint32_t uiDataLength);
/**
* @brief Templated send implementation
* @tparam T Type of data (structure) to send
* @param[in] rt Reference to the data (structure).
* @return Returns 'true' on successful sending; otherwise returns 'false'.
*/
template <typename T>
bool Send(const T& rt)
{
return Send(&rt, sizeof(rt)) == sizeof(rt);
}
/**
* @brief Function to receive data, runs in a thread
*/
void ReceiveMessages();
/**
* @brief Message context structure used when receiving data.
*/
class CMessage : public CAccessorRxPacket
{
public:
/**
* @brief Constructor moving the packet content into the message.
* @param[in] rPacket Reference to the packet to assign.
*/
CMessage(CAccessorRxPacket&& rPacket);
/**
* @brief Destructor accepting the packet if not previously rejected by calling Reset.
*/
~CMessage();
/**
* @brief Returns whether the message is valid (has at least the size of the required header).
* @return
*/
bool IsValid() const;
/**
* @brief Get the message header if the data is at least the size of the header.
* @return The message header or an empty header.
*/
SMsgHdr GetMsgHdr() const;
/**
* @brief Get the connect header if the data is at least the size of the header and has type connect header.
* @return The connect header or an empty header.
*/
SConnectMsg GetConnectHdr() const;
/**
* @brief Get the fragmented message header. if the data is at least the size of the header and has type fragmented header.
* @return The fragmented message header or an empty header.
*/
SFragmentedMsgHdr GetFragmentedHdr() const;
//// The various headers have the SMsgHdr in common.
//union
//{
// SMsgHdr sMsgHdr; ///< Current message header
// SConnectMsg sConnectHdr; ///< Connect header
// SFragmentedMsgHdr sFragmentHdr; ///< Fragment header
// uint8_t rgData[std::max(sizeof(sConnectHdr), sizeof(sFragmentHdr))];
//};
//uint32_t uiSize = 0; ///< Complete size of the message (incl. size of the header)
//uint32_t uiOffset = 0; ///< Current read offset within the message. Complete message when offset == size.
/**
* @brief Trace the protocol data (dependent on ENABLE_REPORTING setting).
* @param[in] rConnection Reference to the connection class containing the connection information.
*/
void PrintHeader(const CConnection& rConnection) const;
};
/**
* @brief Data context structure
*/
struct SDataContext
{
uint32_t uiTotalSize = 0; ///< The total data size among all messages (without message header).
uint32_t uiCurrentOffset = 0; ///< The current offset within the complete fragmented data to be filled during the read process.
size_t nChunkIndex = 0; ///< The current chunk index that is to be filled during the read process.
uint32_t uiChunkOffset = 0; ///< The offset within the current chunk of data to be filled during the read process.
sdv::sequence<sdv::pointer<uint8_t>> seqDataChunks; ///< The data chunks allocated during table reading and available after uiCurrentOffset is identical to uiTotalSize.
};
/**
* @brief Read the data size table (amount of data buffers followed by the size of each buffer).
* @param[in] rMessage Reference to the message containing the table.
* @param[in] rsDataCtxt Data context structure to be initialized - buffers will be allocated.
* @return Returns the current offset of the data within the buffer following the table or 0 if the table could not be read.
*/
uint32_t ReadDataTable(CMessage& rMessage, SDataContext& rsDataCtxt);
/**
* @brief Read the data chunk to the buffers created by the ReadDataTable function. Subsequent calls can be made to this
* function to fill the buffers. The last call (when the chunk index passes the last index in the table) the data will be
* dispatched.
* @param[in] rMessage Reference to the message containing the table.
* @param[in] uiOffset The offset within the message data to start reading the data chunk.
* @param[in] rsDataCtxt Data context structure to be filled.
* @return Returns 'true' if the table could be read successfully; false if not.
*/
bool ReadDataChunk(CMessage& rMessage, uint32_t uiOffset, SDataContext& rsDataCtxt);
#if ENABLE_DECOUPLING > 0
/**
* @brief Decoupled receive data. Prevents blocking the receive buffer while processing.
*/
void DecoupleReceive();
#endif
/**
* @brief Received a synchronization request.
* @param[in] rMessage Reference to the message containing the request.
*/
void ReceiveSyncRequest(const CMessage& rMessage);
/**
* @brief Received a connection request.
* @param[in] rMessage Reference to the message containing the request.
*/
void ReceiveConnectRequest(const CMessage& rMessage);
/**
* @brief Received a synchronization answer.
* @param[in] rMessage Reference to the message containing the answer.
*/
void ReceiveSyncAnswer(const CMessage& rMessage);
/**
* @brief Received a connection answer.
* @param[in] rMessage Reference to the message containing the answer.
*/
void ReceiveConnectAnswer(const CMessage& rMessage);
/**
* @brief Received a connection termination information.
* @param[in] rMessage Reference to the message containing the information.
*/
void ReceiveConnectTerm(CMessage& rMessage);
/**
* @brief Received data message.
* @param[in] rMessage Reference to the message containing the information.
* @param[in] rsDataCtxt Reference to the data message context.
*/
void ReceiveDataMessage(CMessage& rMessage, SDataContext& rsDataCtxt);
/**
* @brief Received data fragment message.
* @param[in] rMessage Reference to the message containing the information.
* @param[in] rsDataCtxt Reference to the data message context.
*/
void ReceiveDataFragementMessage(CMessage& rMessage, SDataContext& rsDataCtxt);
};
#endif // !define CHANNEL_H

View File

@@ -0,0 +1,219 @@
#ifndef IN_PROCESS_MEM_BUFFER_H
#define IN_PROCESS_MEM_BUFFER_H
#include <memory>
#include <cassert>
#include <string>
#include <condition_variable>
#include <support/toml.h>
#include "mem_buffer_accessor.h"
/**
* @brief In-process memory buffer.
*/
template <class TAccessor>
class CInProcMemBuffer : public TAccessor
{
public:
/**
* @brief Default constructor
* @param[in] uiSize Size of the buffer (default is 1 MByte). Must not be zero.
*/
CInProcMemBuffer(uint32_t uiSize = 1048576);
/**
* @brief Connection constructor
* @param[in] rssConnectionString Reference to string with connection information.
*/
CInProcMemBuffer(const std::string& rssConnectionString);
/**
* \brief Default destructor
*/
~CInProcMemBuffer() = default;
/**
* @brief Return the connection string.
* @return The connection string to connect to this buffer.
*/
std::string GetConnectionString() const;
/**
* @brief Trigger listener that a write operation was completed.
*/
void TriggerDataSend() override;
/**
* @brief Wait for a write operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
bool WaitForData(uint32_t uiTimeoutMs) const override;
/**
* @brief Trigger listener that a read operation was completed.
*/
void TriggerDataReceive() override;
/**
* @brief Wait for a read operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
bool WaitForFreeSpace(uint32_t uiTimeoutMs) const override;
private:
/**
* @brief Synchronization object
*/
struct SSync : public std::enable_shared_from_this<SSync>
{
std::mutex mtx; ///< Mutex to synchronize.
std::condition_variable cv; ///< Condition variable to signal.
};
std::shared_ptr<SSync> m_ptrSyncTx; ///< Shared pointer to the TX synchronization object.
std::shared_ptr<SSync> m_ptrSyncRx; ///< Shared pointer to the RX synchronization object.
std::unique_ptr<uint8_t[]> m_ptrBuffer; ///< Smart pointer to the buffer (only for the allocator).
uint32_t m_uiSize = 0; ///< The size of the buffer.
std::string m_ssError; ///< The last reported error.
};
/**
* @brief Inproc memory buffer used for reading.
*/
using CInProcMemBufferRx = CInProcMemBuffer<CMemBufferAccessorRx>;
/**
* @brief Inproc memory buffer used for writing.
*/
using CInProcMemBufferTx = CInProcMemBuffer<CMemBufferAccessorTx>;
template <class TAccessor>
inline CInProcMemBuffer<TAccessor>::CInProcMemBuffer(uint32_t uiSize) :
m_ptrSyncTx(new SSync), m_ptrSyncRx(new SSync), m_ptrBuffer(std::unique_ptr<uint8_t[]>(new uint8_t[uiSize])), m_uiSize(uiSize)
{
assert(uiSize);
if (!uiSize) return;
TAccessor::Attach(m_ptrBuffer.get(), uiSize);
}
template <class TAccessor>
inline CInProcMemBuffer<TAccessor>::CInProcMemBuffer(const std::string& rssConnectionString)
{
if (rssConnectionString.empty())
{
m_ssError = "Missing connection string.";
return;
}
// Interpret the connection string
sdv::toml::CTOMLParser config(rssConnectionString);
// The connection string can contain multiple parameters. Search for the first parameters fitting the accessor direction
size_t nIndex = 0;
sdv::toml::CNodeCollection nodeConnectParamCollection = config.GetDirect("ConnectParam");
uint64_t uiLocation = 0ull, uiSyncTx = 0ull, uiSyncRx = 0ull;
do
{
sdv::toml::CNodeCollection nodeConnectParam;
switch (nodeConnectParamCollection.GetType())
{
case sdv::toml::ENodeType::node_array:
if (nIndex >= nodeConnectParamCollection.GetCount()) break;
nodeConnectParam = nodeConnectParamCollection[nIndex];
break;
case sdv::toml::ENodeType::node_table:
if (nIndex > 0) break;
nodeConnectParam = nodeConnectParamCollection;
break;
default:
break;
}
if (nodeConnectParam.GetType() != sdv::toml::ENodeType::node_table) break;
nIndex++;
// Check for shared memory
if (nodeConnectParam.GetDirect("Type").GetValue() != "inproc_mem") continue;
// Check the direction
if (nodeConnectParam.GetDirect("Direction").GetValue() !=
(TAccessor::GetAccessType() == EAccessType::rx ? "response" : "request"))
continue;
// Get the information
uiLocation = nodeConnectParam.GetDirect("Location").GetValue();
m_uiSize = static_cast<uint32_t>(nodeConnectParam.GetDirect("Size").GetValue());
uiSyncTx = nodeConnectParam.GetDirect("SyncTx").GetValue();
uiSyncRx = nodeConnectParam.GetDirect("SyncRx").GetValue();
break;
} while (true);
if (!uiLocation || !uiSyncTx || !uiSyncRx)
{
m_ssError = "Incomplete connection information.";
return;
}
m_ptrSyncTx = reinterpret_cast<SSync*>(uiSyncTx)->shared_from_this();
m_ptrSyncRx = reinterpret_cast<SSync*>(uiSyncRx)->shared_from_this();
TAccessor::Attach(reinterpret_cast<uint8_t*>(uiLocation));
}
template <class TAccessor>
inline std::string CInProcMemBuffer<TAccessor>::GetConnectionString() const
{
// The connection string contains the TOML file for connecting to this shared memory.
std::stringstream sstream;
sstream << "[[ConnectParam]]" << std::endl;
sstream << "Type = \"inproc_mem\"" << std::endl;
sstream << "Location = " << reinterpret_cast<uint64_t>(TAccessor::GetBufferPointer()) << std::endl;
sstream << "Size = " << m_uiSize << std::endl;
sstream << "SyncTx = " << reinterpret_cast<uint64_t>(m_ptrSyncTx.get()) << std::endl;
sstream << "SyncRx = " << reinterpret_cast<uint64_t>(m_ptrSyncRx.get()) << std::endl;
// The target direction is the opposite of the direction of the accessor. Therefore, if the accessor uses an RX access type,
// the target uses an TX access type and should be configured as response, otherwise it is a request.
sstream << "Direction = \"" << (TAccessor::GetAccessType() == EAccessType::rx ? "request" : "response") << "\"" << std::endl;
return sstream.str();
}
template <class TAccessor>
inline void CInProcMemBuffer<TAccessor>::TriggerDataSend()
{
if (!m_ptrSyncTx) return;
std::unique_lock lock(m_ptrSyncTx->mtx);
m_ptrSyncTx->cv.notify_all();
}
template <class TAccessor>
inline bool CInProcMemBuffer<TAccessor>::WaitForData(uint32_t uiTimeoutMs) const
{
if (!m_ptrSyncTx) return false;
// Check whether there is data; if so, return true.
if (TAccessor::HasUnreadData()) return true;
std::unique_lock lock(m_ptrSyncTx->mtx);
return m_ptrSyncTx->cv.wait_for(lock, std::chrono::milliseconds(uiTimeoutMs)) == std::cv_status::no_timeout;
}
template <class TAccessor>
inline void CInProcMemBuffer<TAccessor>::TriggerDataReceive()
{
if (!m_ptrSyncRx) return;
std::unique_lock lock(m_ptrSyncRx->mtx);
m_ptrSyncRx->cv.notify_all();
}
template <class TAccessor>
inline bool CInProcMemBuffer<TAccessor>::WaitForFreeSpace(uint32_t uiTimeoutMs) const
{
if (!m_ptrSyncRx) return false;
std::unique_lock lock(m_ptrSyncRx->mtx);
if (TAccessor::Canceled())
return false;
return m_ptrSyncRx->cv.wait_for(lock, std::chrono::milliseconds(uiTimeoutMs)) == std::cv_status::no_timeout;
}
#endif // !defined(IN_PROCESS_MEM_BUFFER_H)

View File

@@ -0,0 +1,529 @@
#include "mem_buffer_accessor.h"
#include <cassert>
void CMemBufferAccessorBase::Attach(uint8_t* pBuffer, uint32_t uiSize /*= 0*/)
{
// Attach is only allowed to be called once
assert(!m_pHdr && !m_pBuffer);
if (m_pHdr || m_pBuffer)
{
std::stringstream sstream;
sstream << "Accessor: Attaching is only allowed once (";
if (m_pHdr) sstream << "header";
if (m_pHdr && m_pBuffer) sstream << ", ";
if (m_pBuffer) sstream << "buffer";
sstream << ")" << std::endl;
std::cout << sstream.str();
return;
}
assert(!uiSize || uiSize > sizeof(SBufferHdr));
// Assign the buffer
assert(pBuffer);
m_pHdr = reinterpret_cast<SBufferHdr*>(pBuffer);
if (!m_pHdr)
{
std::cout << "Accessor: header is NULL" << std::endl;
return;
}
m_pBuffer = pBuffer + static_cast<uint32_t>(sizeof(SBufferHdr));
// If a size has been provided, initialize the header
if (uiSize)
{
*m_pHdr = SBufferHdr();
m_pHdr->uiSize = uiSize - static_cast<uint32_t>(sizeof(SBufferHdr));
}
// Check for correct interface version (to prevent misaligned communication).
assert(m_pHdr->uiVersion == SDVFrameworkInterfaceVersion);
// Check for the size to be larger than the buffer header and 64-bit aligned
assert(m_pHdr->uiSize > sizeof(SBufferHdr));
assert(m_pHdr->uiSize % 8 == 0);
if (m_pHdr->uiVersion != SDVFrameworkInterfaceVersion || m_pHdr->uiSize <= sizeof(SBufferHdr) || m_pHdr->uiSize % 8 != 0)
{
m_pHdr = nullptr;
m_pBuffer = nullptr;
}
}
void CMemBufferAccessorBase::Detach()
{
m_pBuffer = nullptr;
m_pHdr = nullptr;
}
bool CMemBufferAccessorBase::IsValid() const
{
return m_pHdr && m_pBuffer;
}
const uint8_t* CMemBufferAccessorBase::GetBufferPointer() const
{
return reinterpret_cast<uint8_t*>(m_pHdr);
}
CAccessorTxPacket::CAccessorTxPacket(CMemBufferAccessorTx& rAccessor, CMemBufferAccessorBase::SPacketHdr* pPacketHdr) :
m_pAccessor(&rAccessor)
{
// Checks
if (!pPacketHdr) return;
if (pPacketHdr->eType != CMemBufferAccessorBase::SPacketHdr::EType::data) return;
if (pPacketHdr->eState != CMemBufferAccessorBase::SPacketHdr::EState::reserved) return;
m_pPacketHdr = pPacketHdr;
}
CAccessorTxPacket::CAccessorTxPacket(CAccessorTxPacket&& rpacket) noexcept:
m_pAccessor(rpacket.m_pAccessor), m_pPacketHdr(rpacket.m_pPacketHdr)
{
rpacket.m_pAccessor = nullptr;
rpacket.m_pPacketHdr = nullptr;
}
CAccessorTxPacket::~CAccessorTxPacket()
{
Commit();
}
CAccessorTxPacket& CAccessorTxPacket::operator=(CAccessorTxPacket&& rpacket) noexcept
{
m_pAccessor = rpacket.m_pAccessor;
m_pPacketHdr = rpacket.m_pPacketHdr;
rpacket.m_pAccessor = nullptr;
rpacket.m_pPacketHdr = nullptr;
return *this;
}
CAccessorTxPacket::operator bool() const
{
return IsValid();
}
bool CAccessorTxPacket::IsValid() const
{
return m_pPacketHdr;
}
void CAccessorTxPacket::Commit()
{
if (m_pAccessor && m_pPacketHdr)
m_pAccessor->Commit(m_pPacketHdr);
m_pPacketHdr = nullptr;
}
uint32_t CAccessorTxPacket::GetSize() const
{
return m_pPacketHdr ? m_pPacketHdr->uiSize : 0;
}
uint8_t* CAccessorTxPacket::GetDataPtr()
{
return GetSize() ? reinterpret_cast<uint8_t*>(m_pPacketHdr + 1) : 0;
}
CMemBufferAccessorTx::~CMemBufferAccessorTx()
{
m_bBlockReserve = true;
// Wait until all reserved packets are committed (could cause a crash otherwise).
std::unique_lock<std::mutex> lock(m_mtxReservedPackes);
// Remove any committed packets
while (!m_queueReservedPackets.empty())
{
uint32_t uiTxPos = m_pHdr->uiTxPos;
SPacketHdr* pPacketHdr = m_queueReservedPackets.front();
// Is the top most packet not in committed state, we'll wait.
if (pPacketHdr->eState != SPacketHdr::EState::commit)
{
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(10));
lock.lock();
continue;
}
// Update the write position
uiTxPos =
Align(static_cast<uint32_t>(reinterpret_cast<uint8_t*>(pPacketHdr) - m_pBuffer + sizeof(SPacketHdr))
+ pPacketHdr->uiSize);
// In case the write position is pointing to the end of the buffer, jump to the begin.
if (uiTxPos >= m_pHdr->uiSize)
{
uiTxPos = 0;
}
// Remove from queue
m_queueReservedPackets.pop_front();
m_pHdr->uiTxPos = uiTxPos;
}
}
std::optional<CAccessorTxPacket> CMemBufferAccessorTx::Reserve(uint32_t uiSize, uint32_t uiTimeoutMs)
{
if (m_bBlockReserve) return {};
if (!IsValid()) return {};
if (uiSize > m_pHdr->uiSize) return {};
uint32_t uiTxPos = 0;
bool bStuffingNeeded = false;
while (!m_bCancel)
{
// Create a snapshot of the read and write positions
// If exists, use the last stored position in the queue. Otherwise use the position from the header.
uint32_t uiRxPos = m_pHdr->uiRxPos;
uiTxPos = m_pHdr->uiTxPos;
std::unique_lock<std::mutex> lock(m_mtxReservedPackes);
if (!m_queueReservedPackets.empty())
uiTxPos = Align(reinterpret_cast<uint8_t*>(m_queueReservedPackets.back()) - m_pBuffer + sizeof(SPacketHdr)
+ m_queueReservedPackets.back()->uiSize);
// Calculate the needed size (incl header) aligned to 64 bits.
uint32_t uiNeededSize = Align(uiSize + static_cast<uint32_t>(sizeof(SPacketHdr)));
// If the read position is beyond the write position, the available space is the diference
// If the read position is behind the write position, the available space is eiher until the end of the buffer or if not
// fitting from the beginning of the buffer until the read position.
uint32_t uiMaxSize = 0;
bStuffingNeeded = false;
if (uiRxPos > uiTxPos) // uiTxPos made a roundtrip already
uiMaxSize = uiRxPos - uiTxPos - 1; // The last possible writing position is one before the reading position
else // uiRxPos is running after uiTxPos
{
// uiMaxSize is the rest of the buffer
uiMaxSize = m_pHdr->uiSize > uiTxPos ? m_pHdr->uiSize - uiTxPos : 0;
// When uiRxPos is at the beginning, this is a special situation, max size is the rest minus 1
if (!uiRxPos)
uiMaxSize--;
else if (uiMaxSize < uiNeededSize)
{
bStuffingNeeded = true;
uiMaxSize = uiRxPos - 1;
}
}
// Check for size
if (uiNeededSize <= uiMaxSize)
break;
// Wait for a reserve
if (!WaitForFreeSpace(uiTimeoutMs))
return {};
}
if (m_bCancel) return {};
// Stuffing needed?
if (bStuffingNeeded)
{
// Create stuffing packet... but only if the header still fits
if (m_pHdr->uiSize - uiTxPos >= static_cast<uint32_t>(sizeof(SPacketHdr)))
{
SPacketHdr* pStuffPacket = GetPacketHdr(uiTxPos);
pStuffPacket->eType = SPacketHdr::EType::stuffing;
pStuffPacket->uiSize = m_pHdr->uiSize - uiTxPos - static_cast<uint32_t>(sizeof(SPacketHdr));
pStuffPacket->eState = SPacketHdr::EState::commit;
}
else if (uiTxPos < m_pHdr->uiSize)
std::fill_n(m_pBuffer + uiTxPos, m_pHdr->uiSize - uiTxPos, static_cast<uint8_t>(0));
// After stuffing, the new location is at the beginning of the buffer.
uiTxPos = 0;
}
// Prepare a packet
SPacketHdr* pPacket = GetPacketHdr(uiTxPos);
pPacket->eType = SPacketHdr::EType::data;
pPacket->uiSize = uiSize;
pPacket->eState = SPacketHdr::EState::reserved;
// Add the packet to the queue
m_queueReservedPackets.push_back(pPacket);
// Create the packet
return CAccessorTxPacket(*this, pPacket);
}
void CMemBufferAccessorTx::Commit(SPacketHdr* pPacketHdr)
{
if (!IsValid()) return;
if (!pPacketHdr) return;
// pData needs to point to an area in the buffer starting at the offset of a packet header.
if (reinterpret_cast<uint8_t*>(pPacketHdr) > m_pBuffer + m_pHdr->uiSize)
return; // Pointing beyond the buffer
if (reinterpret_cast<uint8_t*>(pPacketHdr) < m_pBuffer)
return; // Pointing before the first possible packet header
// Check packet header
if (pPacketHdr->eType != SPacketHdr::EType::data)
return; // Must be of type data
if (pPacketHdr->eState != SPacketHdr::EState::reserved)
return; // Must have reserved state
if (reinterpret_cast<uint8_t*>(pPacketHdr) + pPacketHdr->uiSize + sizeof(SPacketHdr) > m_pBuffer + m_pHdr->uiSize)
return; // Size cannot be beyond buffer
// Commit the packet
pPacketHdr->eState = SPacketHdr::EState::commit;
// Trigger processing
TriggerDataSend();
// Run through the queue and check whether the top most packet is actually committed
std::unique_lock<std::mutex> lock(m_mtxReservedPackes);
uint32_t uiTxPos = m_pHdr->uiTxPos;
while (!m_queueReservedPackets.empty())
{
SPacketHdr* pPacketHdr2 = m_queueReservedPackets.front();
// Is the top most packet not in committed state, we're done.
if (pPacketHdr2->eState != SPacketHdr::EState::commit) break;
// Update the write position
uiTxPos = Align(static_cast<uint32_t>(reinterpret_cast<uint8_t*>(pPacketHdr2) - m_pBuffer + sizeof(SPacketHdr))
+ pPacketHdr2->uiSize);
// In case the write position is pointing to the end of the buffer, jump to the begin.
if (uiTxPos >= m_pHdr->uiSize)
{
uiTxPos = 0;
}
// Remove from queue
m_queueReservedPackets.pop_front();
}
m_pHdr->uiTxPos = uiTxPos;
}
bool CMemBufferAccessorTx::TryWrite(const void* pData, uint32_t uiSize)
{
if (!IsValid())
return false;
// pData is only allowed to be NULL when uiSize is zero
if (uiSize && !pData)
return false;
// Reserve a packet
auto optPacket = Reserve(uiSize);
if (!optPacket) return false;
// Copy the data
if (optPacket->GetSize())
std::copy(reinterpret_cast<const uint8_t*>(pData), reinterpret_cast<const uint8_t*>(pData) + uiSize, optPacket->GetDataPtr());
return true;
}
CAccessorRxPacket::CAccessorRxPacket(CMemBufferAccessorRx& rAccessor, CMemBufferAccessorBase::SPacketHdr* pPacketHdr) :
m_pAccessor(&rAccessor)
{
// Checks
if (!pPacketHdr) return;
if (pPacketHdr->eType != CMemBufferAccessorBase::SPacketHdr::EType::data) return;
if (pPacketHdr->eState != CMemBufferAccessorBase::SPacketHdr::EState::read) return;
m_pPacketHdr = pPacketHdr;
}
CAccessorRxPacket::CAccessorRxPacket(CAccessorRxPacket&& rpacket) noexcept:
m_pAccessor(rpacket.m_pAccessor), m_pPacketHdr(rpacket.m_pPacketHdr)
{
rpacket.m_pAccessor = nullptr;
rpacket.m_pPacketHdr = nullptr;
}
CAccessorRxPacket& CAccessorRxPacket::operator=(CAccessorRxPacket&& rpacket) noexcept
{
m_pAccessor = rpacket.m_pAccessor;
m_pPacketHdr = rpacket.m_pPacketHdr;
rpacket.m_pAccessor = nullptr;
rpacket.m_pPacketHdr = nullptr;
return *this;
}
CAccessorRxPacket::operator bool() const
{
return IsValid();
}
bool CAccessorRxPacket::IsValid() const
{
return GetData() && GetSize();
}
void CAccessorRxPacket::Reset()
{
m_pPacketHdr = nullptr;
}
uint32_t CAccessorRxPacket::GetSize() const
{
return m_pPacketHdr ? m_pPacketHdr->uiSize : 0;
}
const uint8_t* CAccessorRxPacket::GetData() const
{
return reinterpret_cast<const uint8_t*>(m_pPacketHdr ? m_pPacketHdr + 1 : nullptr);
}
void CAccessorRxPacket::Accept()
{
if (!m_pAccessor) return;
// Mark the packet as free
std::unique_lock<std::mutex> lock(m_pAccessor->m_mtxReadAccess);
if (m_pPacketHdr) m_pPacketHdr->eState = CMemBufferAccessorBase::SPacketHdr::EState::free;
lock.unlock();
// Release any read packets
m_pAccessor->ReleasePackets();
}
void CMemBufferAccessorRx::Attach(uint8_t* pBuffer, uint32_t uiSize /*= 0*/)
{
// Restore from a possible previous connection.
CMemBufferAccessorBase::Attach(pBuffer, uiSize);
}
std::optional<CAccessorRxPacket> CMemBufferAccessorRx::TryRead()
{
if (!IsValid()) return {};
// Create a snapshot of the read and write positions
std::unique_lock<std::mutex> lock(m_mtxReadAccess);
uint32_t uiRxPos = m_pHdr->uiRxPos;
uint32_t uiTxPos = m_pHdr->uiTxPos;
// Find the next available data packet that is unread
while (uiRxPos != uiTxPos)
{
// Check for an invalid position
if (uiRxPos > m_pHdr->uiSize)
{
// Should not happen!
uiRxPos -= m_pHdr->uiSize;
continue;
}
// Is there still a header defined until the end of the buffer?
if (m_pHdr->uiSize - uiRxPos < static_cast<uint32_t>(sizeof(SPacketHdr)))
{
// If the uiTxPos is equal or larger, no more header is available at the moment.
if (uiTxPos >= uiRxPos) break;
// Start at the beginning
uiRxPos = 0;
continue;
}
// Get the next packet
SPacketHdr* pPacketHdr = GetPacketHdr(uiRxPos);
// Check the packet state
enum class ENextStep {process, cancel, skip} eNextStep = ENextStep::cancel;
switch (pPacketHdr->eState)
{
case SPacketHdr::EState::commit: // Packet is available and unread; process if packet is data packet.
if (pPacketHdr->eType == SPacketHdr::EType::data)
eNextStep = ENextStep::process;
else
eNextStep = ENextStep::skip;
break;
case SPacketHdr::EState::read: // Packet is in use by other thread. Skip this packet.
case SPacketHdr::EState::free: // Packet is released, but TX has't been updated. Skip this packet.
eNextStep = ENextStep::skip;
break;
case SPacketHdr::EState::reserved: // Packet is not finall written. This should not happen.
default:
eNextStep = ENextStep::cancel;
break;
}
// Do the next step
if (eNextStep == ENextStep::cancel)
break;
if (eNextStep == ENextStep::skip)
{
// Update the read position
uiRxPos = Align(static_cast<uint32_t>(reinterpret_cast<uint8_t*>(pPacketHdr) - m_pBuffer + sizeof(SPacketHdr))
+ pPacketHdr->uiSize);
continue;
}
// Update packet header
pPacketHdr->eState = SPacketHdr::EState::read;
// Return a packet class
return CAccessorRxPacket(*this, pPacketHdr);
}
// Packet not available.
return {};
}
void CMemBufferAccessorRx::ReleasePackets()
{
if (!IsValid()) return;
// Create a snapshot of the read and write positions
std::unique_lock<std::mutex> lock(m_mtxReadAccess);
uint32_t uiRxPos = m_pHdr->uiRxPos;
uint32_t uiTxPos = m_pHdr->uiTxPos;
// Find the next available data packet that is unread
while (uiRxPos != uiTxPos)
{
// Check for an invalid position
if (uiRxPos > m_pHdr->uiSize)
{
// Should not happen!
uiRxPos -= m_pHdr->uiSize;
continue;
}
// Is there still a header defined until the end of the buffer?
if (m_pHdr->uiSize - uiRxPos < static_cast<uint32_t>(sizeof(SPacketHdr)))
{
// If the uiTxPos is equal or larger, no more header is available at the moment.
if (uiTxPos >= uiRxPos) break;
// Start at the beginning
uiRxPos = 0;
continue;
}
// Get the next packet
SPacketHdr* pPacketHdr = GetPacketHdr(uiRxPos);
// Check whether the packet is an unread data packet
if (pPacketHdr->eType == SPacketHdr::EType::data && pPacketHdr->eState != SPacketHdr::EState::free) break;
// Update the read position
uiRxPos = Align(static_cast<uint32_t>(reinterpret_cast<uint8_t*>(pPacketHdr) - m_pBuffer + sizeof(SPacketHdr))
+ pPacketHdr->uiSize);
}
// Store the new position
m_pHdr->uiRxPos = uiRxPos;
// Trigger
TriggerDataReceive();
}
CMemBufferAccessorBase::SPacketHdr* CMemBufferAccessorBase::GetPacketHdr(uint32_t uiPos) const
{
if (!m_pBuffer) return nullptr;
return reinterpret_cast<SPacketHdr*>(m_pBuffer + uiPos);
}
uint8_t* CMemBufferAccessorBase::GetPacketData(uint32_t uiPos) const
{
if (!m_pBuffer) return nullptr;
return m_pBuffer + uiPos + static_cast<uint32_t>(sizeof(SPacketHdr));
}

View File

@@ -0,0 +1,476 @@
#ifndef MEM_BUFFER_ACCESSOR_H
#define MEM_BUFFER_ACCESSOR_H
#include <deque>
#include <mutex>
#include <cassert>
#include <cstring>
#include <iostream>
#include <sstream>
#include <thread>
#include <optional>
#include <interfaces/core_types.h>
/**
* @brief Accessor direction
*/
enum class EAccessType
{
rx, ///< Reading accessor
tx, ///< Writing accessor
};
/**
* @brief Memory buffer accessor.
* @details Lock-free memory buffer accessor. This buffer allows the writing and reading of memory packets in a circular buffer.
* @attention This class does not synchronize calls! It works if only one thread at the time writes data and only one thread at
* the time reads data. Reading and writing can occur simulateously.
* @attention No protection is implemented to monitor the lifetime of the buffer. It is assumed that during any call this buffer
* is valid.
* @remarks This class doesn't provide any handshaking. This is the task of the objects using this class.
*/
class CMemBufferAccessorBase
{
protected:
/**
* @brief Memory buffer header
*/
struct SBufferHdr
{
uint32_t uiVersion = SDVFrameworkInterfaceVersion; ///< Check for compatible communication channel
uint32_t uiSize = 0u; ///< The size of the buffer
uint32_t uiTxPos = 0u; ///< Current write position
uint32_t uiRxPos = 0u; ///< Current read position
};
public:
/**
* @brief Packet header
* @details The packet header is placed in front of every packet and describes the packet type as well as the size of the
* packet.
* @remarks Packets are always starting at a 64-bit boundary.
*/
struct SPacketHdr
{
/**
* @brief Packet types
*/
enum class EType : uint16_t
{
data = 0, ///< The packet contains data
stuffing = 1, ///< The packet doesn't contain data and is used to fill up space
} eType; ///< Packet type
/**
* @brief Packet state
*/
enum class EState : uint16_t
{
free = 0, ///< Packet is not reserved; overwriting is allowed
reserved = 1, ///< Space is reserved but not committed.
commit = 2, ///< Packet is committed and available for reading.
read = 3, ///< Packet is currently being read.
} eState; ///< The packet state
uint32_t uiSize; ///< Packet size
};
/**
* @brief Constructor
*/
CMemBufferAccessorBase() = default;
/**
* @brief Attach the buffer to the accessor.
* @param[in] pBuffer Pointer to the buffer that should be accessed.
* @param[in] uiSize Size of the buffer when still uninitialized (during buffer creation); otherwise 0.
*/
virtual void Attach(uint8_t* pBuffer, uint32_t uiSize = 0);
/**
* @brief Detach the buffer from the accessor.
*/
virtual void Detach();
/**
* @brief Is valid.
* @return Returns 'true' when valid; otherwise 'false'.
*/
bool IsValid() const;
/**
* @brief Get the buffer pointer.
* @return Returns a pointer to the buffer or NULL when there is no buffer.
*/
const uint8_t* GetBufferPointer() const;
/**
* @brief Reset the Rx position to skip any sent data. Typically needed when a reconnect should take place.
*/
void ResetRx() { if (m_pHdr) m_pHdr->uiRxPos = m_pHdr->uiTxPos; m_bCancel = false; }
/**
* @brief Cancel send operation.
*/
void CancelSend() { m_bCancel = true; TriggerDataReceive(); std::this_thread::sleep_for(std::chrono::milliseconds(10)); }
/**
* @brief Canceled?
* @return Returns 'true' when the currend send job has been canceled; 'false' when not.
*/
bool Canceled() const { return m_bCancel; }
protected:
/**
* @brief Providing the position, return the packet header preceding the data.
* @param[in] uiPos The position in the buffer
* @return Pointer to the packet header or NULL when the position isn't within the buffer.
*/
SPacketHdr* GetPacketHdr(uint32_t uiPos) const;
/**
* @brief Providing the position, return the packet data following the header.
* @param[in] uiPos The position in the buffer
* @return Pointer to the packet data or NULL when the position isn't within the buffer.
*/
uint8_t* GetPacketData(uint32_t uiPos) const;
/**
* @brief Has unread data.
* @return Returns 'true' when unread data is still available; otherwise returns 'false'.
*/
virtual bool HasUnreadData() const { return m_pHdr && m_pHdr->uiRxPos != m_pHdr->uiTxPos; }
/**
* @brief Trigger listener that a write operation was completed.
*/
virtual void TriggerDataSend() = 0;
/**
* @brief Wait for a write operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
virtual bool WaitForData(uint32_t uiTimeoutMs) const = 0;
/**
* @brief Trigger listener that a read operation was completed.
*/
virtual void TriggerDataReceive() = 0;
/**
* @brief Wait for a read operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
virtual bool WaitForFreeSpace(uint32_t uiTimeoutMs) const = 0;
/**
* @brief Align to 64 bits
* @tparam T The type of the size argument.
* @param[in] tSize The current size.
* @return The 64-bit aligned size.
*/
template <typename T>
inline uint32_t Align(T tSize)
{
uint32_t uiSize = static_cast<uint32_t>(tSize);
return (uiSize % 8) ? uiSize + 8 - uiSize % 8 : uiSize;
}
uint8_t* m_pBuffer = nullptr; ///< Buffer pointer
SBufferHdr* m_pHdr = nullptr; ///< Buffer header
bool m_bCancel = false; ///< Cancel the send operation
};
// Forward declaration
class CMemBufferAccessorTx;
/**
* @brief Tx access can be done randomly. To prevent shutting down while packets are reserved but not committed, the accessor
* packet take over the management of this.
*/
class CAccessorTxPacket
{
public:
/**
* @brief Default constructor
*/
CAccessorTxPacket() = default;
/**
* @brief Constructor
* @param[in] rAccessor Reference to the accessor managing the buffer.
* @param[in] pPacketHdr Pointer to the packet header.
*/
CAccessorTxPacket(CMemBufferAccessorTx& rAccessor, CMemBufferAccessorBase::SPacketHdr* pPacketHdr);
/**
* @brief Copy constructor is deleted.
* @param[in] rpacket Reference to the packet to copy from.
*/
CAccessorTxPacket(const CAccessorTxPacket& rpacket) = delete;
/**
* @brief Move constructor
* @param[in] rpacket Reference to the packet to move from.
*/
CAccessorTxPacket(CAccessorTxPacket&& rpacket) noexcept;
/**
* @brief Destructor - will automatically commit the packet if not done so already.
*/
~CAccessorTxPacket();
/**
* @brief Copy operator is deleted.
* @param[in] rpacket Reference to the packet to copy from.
* @return Reference to this packet class.
*/
CAccessorTxPacket& operator=(const CAccessorTxPacket& rpacket) = delete;
/**
* @brief Copy operator is deleted.
* @param[in] rpacket Reference to the packet to copy from.
* @return Reference to this packet class.
*/
CAccessorTxPacket& operator=(CAccessorTxPacket&& rpacket) noexcept;
/**
* @brief Cast operator used to check validity.
*/
operator bool() const;
/**
* @brief Does the packet contain valid data (data is not committed yet).
* @return The validity of the packet.
*/
bool IsValid() const;
/**
* @brief Commit the data. This will invalidate the content.
*/
void Commit();
/**
* @brief Get the size of the packet data.
* @return The packet size; could be 0 when no data was allocated.
*/
uint32_t GetSize() const;
/**
* @brief Get the packet pointer.
* @return Pointer to the buffer holding the packet data or nullptr when there is no data allocated or the packet has been
* committed.
*/
uint8_t* GetDataPtr();
/**
* @brief Templated version of GetData.
* @tparam TData The type to cast the data pointer to.
* @param[in] uiOffset The offset to use in bytes (default no offset).
* @return Pointer to the data or NULL when the data doesn't fit in the buffer.
*/
template <typename TData>
TData* GetDataPtr(uint32_t uiOffset = 0)
{
return GetSize() >= (uiOffset + sizeof(TData)) ? reinterpret_cast<TData*>(GetDataPtr()) : nullptr;
}
private:
CMemBufferAccessorTx* m_pAccessor = nullptr; ///< Pointer to the accessor.
CMemBufferAccessorBase::SPacketHdr* m_pPacketHdr = nullptr; ///< Packet header.
};
/**
* @brief Accessor TX implementation
*/
class CMemBufferAccessorTx : public CMemBufferAccessorBase
{
friend CAccessorTxPacket;
public:
/**
* @brief Destructor
*/
~CMemBufferAccessorTx();
/**
* @brief Reserve memory for writing to the buffer without an extra copy.
* @param[in] uiSize Size of the memory block to reserve. Must be smaller than the buffer size.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a free buffer. Default 1000 ms (needed when allocating large
* chunks of data that need to be allocated on the receiving side).
* @return Returns a packet object if successful or an empty option when a timeout occurred.
* @attention The reader will only be able to continue reading after a commit. If, for some reason, there is no commit, the
* reader won't be able to retrieve any more data.
* @attention Performance is poor if the size is close to the buffer size. As a rule, uiSize should be at the most 1/4th of
* the buffer size.
*/
std::optional<CAccessorTxPacket> Reserve(uint32_t uiSize, uint32_t uiTimeoutMs = 1000);
/**
* @brief Write data to the buffer.
* @param[in] pData Pointer to the data to write.
* @param[in] uiSize Length of the data in bytes.
* @return Returns 'true' if writing was successful or 'false' if the packet was invalid or (currently) doesn't fit into the
* buffer.
*/
bool TryWrite(const void* pData, uint32_t uiSize);
/**
* @brief Get the accessor access type.
* @return The access type of this accessor.
*/
static constexpr EAccessType GetAccessType() { return EAccessType::tx; };
private:
/**
* @brief Commit the writing to the buffer.
* @param[in] pPacketHdr Pointer to the previously reserved packet header.
*/
void Commit(SPacketHdr* pPacketHdr);
bool m_bBlockReserve = false; ///< When set, do not reserve any more packets.
std::mutex m_mtxReservedPackes; ///< Access protection for the reserved access queue.
std::deque<SPacketHdr*> m_queueReservedPackets; ///< Queue containing the currently reserved packets.
};
// Forward declaration
class CMemBufferAccessorRx;
/**
* @brief Rx access can be done randomly. To ensure the the data is released properly, the accessor packet is returned to manage
* this.
*/
class CAccessorRxPacket
{
public:
/**
* @brief Default constructor
*/
CAccessorRxPacket() = default;
/**
* @brief Constructor
* @param[in] rAccessor Reference to the accessor managing the buffer.
* @param[in] pPacketHdr Pointer to the packet header.
*/
CAccessorRxPacket(CMemBufferAccessorRx& rAccessor, CMemBufferAccessorBase::SPacketHdr* pPacketHdr);
/**
* @brief Copy constructor is deleted.
* @param[in] rpacket Reference to the packet to copy from.
*/
CAccessorRxPacket(const CAccessorRxPacket& rpacket) = delete;
/**
* @brief Move constructor
* @param[in] rpacket Reference to the packet to move from.
*/
CAccessorRxPacket(CAccessorRxPacket&& rpacket) noexcept;
/**
* @brief Copy operator is deleted.
* @param[in] rpacket Reference to the packet to copy from.
* @return Reference to this packet class.
*/
CAccessorRxPacket& operator=(const CAccessorRxPacket& rpacket) = delete;
/**
* @brief Copy operator is deleted.
* @param[in] rpacket Reference to the packet to copy from.
* @return Reference to this packet class.
*/
CAccessorRxPacket& operator=(CAccessorRxPacket&& rpacket) noexcept;
/**
* @brief Cast operator used to check validity.
*/
operator bool() const;
/**
* @brief Does the packet contain valid data (data pointer is available and the size is not zero).
* @return The validity of the packet.
*/
bool IsValid() const;
/**
* @brief Reset the packet (not accepting the content). Packet will be available again for next request.
*/
void Reset();
/**
* @brief Get the size of the packet data.
* @return The packet size.
*/
uint32_t GetSize() const;
/**
* @brief Get the packet pointer.
* @return Pointer to the buffer holding the packet data or nullptr when the packet is cleared.
*/
const uint8_t* GetData() const;
/**
* @brief Templated version of GetData.
* @tparam TData The type to cast the data pointer to.
* @param[in] uiOffset The offset to use in bytes (default no offset).
* @return Pointer to the data or NULL when the data doesn't fit in the buffer.
*/
template <typename TData>
const TData* GetData(uint32_t uiOffset = 0) const
{
return GetSize() >= (uiOffset + sizeof(TData)) ? reinterpret_cast<const TData*>(GetData()) : nullptr;
}
/**
* @brief Accept the packet; packet will be released for new packet data.
*/
void Accept();
private:
CMemBufferAccessorRx* m_pAccessor = nullptr; ///< Pointer to the accessor.
CMemBufferAccessorBase::SPacketHdr* m_pPacketHdr = nullptr; ///< Packet header.
};
/**
* @brief Accessor RX implementation
*/
class CMemBufferAccessorRx : public CMemBufferAccessorBase
{
friend CAccessorRxPacket;
public:
// Suppress cppcheck warning about a useless override. The function is here for clearer code documentation.
// cppcheck-suppress uselessOverride
/**
* @brief Attach the buffer to the accessor. Overload of CMemBufferAccessorBase::Attach.
* @param[in] pBuffer Pointer to the buffer that should be accessed.
* @param[in] uiSize Size of the buffer when still uninitialized (during buffer creation); otherwise 0.
*/
virtual void Attach(uint8_t* pBuffer, uint32_t uiSize = 0) override;
/**
* @brief Get a packet. If the packet is accepted, call release before the packet
* @return Returns 'true' when the access request was successful or 'false' when currently there is no data.
* @attention The writer will only be able to overwrite previously read data after it is released. If, for some reason there is
* no release, the writer won't be able to write past the accessed packet.
*/
std::optional<CAccessorRxPacket> TryRead();
/**
* @brief Get the accessor access type.
* @return The access type of this accessor.
*/
static constexpr EAccessType GetAccessType() { return EAccessType::rx; };
protected:
/**
* @brief Release any read packets and update the read pointer.
*/
void ReleasePackets();
private:
std::mutex m_mtxReadAccess; ///< Synchronize read access.
};
#endif // !defined MEM_BUFFER_ACCESSOR_H

View File

@@ -0,0 +1,466 @@
#if !defined POSIX_SHARED_MEM_BUFFER_H && defined __unix__
#define POSIX_SHARED_MEM_BUFFER_H
#include <cassert>
#include <stdio.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <unistd.h>
#include <string.h>
#include <support/toml.h>
#include "../../global/trace.h"
#include "mem_buffer_accessor.h"
/**
* @brief In-process memory buffer.
*/
template <class TAccessor>
class CSharedMemBuffer : public TAccessor
{
public:
/**
* @brief Default constructor
* @param[in] uiSize Optional size of the buffer. If zero, a default buffer size of 10k is configured.
* @param[in] rssName Optional name to be used for the connection. If empty, a random name is generated.
* @param[in] bServer Optional boolean indicating whether the connection is a server (true), which initiates the connection, or
* a client (false), which opens an existing connection.
*/
CSharedMemBuffer(uint32_t uiSize = 0, const std::string& rssName = std::string(), bool bServer = true);
/**
* @brief Connection constructor
* @param[in] rssConnectionString Reference to string with connection information.
*/
CSharedMemBuffer(const std::string& rssConnectionString);
/** No copy constructor */
CSharedMemBuffer(const CSharedMemBuffer&) = delete;
/** No move constructor */
CSharedMemBuffer(CSharedMemBuffer&&) = delete;
/**
* \brief Default destructor
*/
~CSharedMemBuffer();
/** No copy-assignment operator */
CSharedMemBuffer& operator=(const CSharedMemBuffer&) = delete;
/** No move-assignment operator */
CSharedMemBuffer& operator=(CSharedMemBuffer&&) = delete;
/**
* @brief Detach the buffer. Overload of CMemBufferAccessorBase::Detach.
* @details The detach function detaches the shared memory without deleting the memory. This keeps the memory alive for reuse.
*/
virtual void Detach() override;
/**
* @brief Return the connection string to connect to this shared memory.
* @return The connection string to connect to this buffer.
*/
std::string GetConnectionString() const;
/**
* @brief Trigger listener that a write operation was completed.
*/
void TriggerDataSend() override;
/**
* @brief Wait for a write operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
bool WaitForData(uint32_t uiTimeoutMs) const override;
/**
* @brief Trigger listener that a read operation was completed.
*/
void TriggerDataReceive() override;
/**
* @brief Wait for a read operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
bool WaitForFreeSpace(uint32_t uiTimeoutMs) const override;
/**
* @brief Return the last reported error.
* @return Error string.
*/
std::string GetError() const { return m_ssError; }
/**
* @brief Get the size of the buffer.
* @return Returns the size of the buffer.
*/
uint32_t GetSize() const { return m_uiSize; }
/**
* @brief Get the name of the buffer.
* @return Returns the name of the buffer.
*/
std::string GetName() const { return m_ssName; }
private:
uint32_t m_uiSize = 0u; ///< Size of the shared memory buffer.
int m_iFileDescr = 0; ///< File descriptor of the shared memory.
std::string m_ssName; ///< Name of the shared memory.
uint8_t* m_pBuffer = nullptr; ///< Pointer to the mapped buffer.
std::string m_ssSyncTx; ///< Name of the signalling event.
sem_t* m_pSemaphoreTx = nullptr; ///< Semaphore to trigger written.
std::string m_ssSyncRx; ///< Name of the signalling event.
sem_t* m_pSemaphoreRx = nullptr; ///< Semaphore to trigger written.
std::string m_ssError; ///< The last reported error.
bool m_bServer = false; ///< Set when the shared memory is configured as server. Otherwise as client.
};
/**
* @brief Shared memory buffer used for reading.
*/
using CSharedMemBufferRx = CSharedMemBuffer<CMemBufferAccessorRx>;
/**
* @brief Shared memory buffer used for writing.
*/
using CSharedMemBufferTx = CSharedMemBuffer<CMemBufferAccessorTx>;
template <class TAccessor>
inline CSharedMemBuffer<TAccessor>::CSharedMemBuffer(uint32_t uiSize /*= 0*/, const std::string& rssName /*= std::string()*/,
bool bServer /*= true*/) : m_uiSize(bServer ? (uiSize ? uiSize : 128 * 1024) : 0), m_bServer(bServer)
{
// Create a name to be used in the connection string
std::string ssDirectionString;
if (bServer)
ssDirectionString = TAccessor::GetAccessType() == EAccessType::rx ? "RESPONSE_" : "REQUEST_";
else
ssDirectionString = TAccessor::GetAccessType() == EAccessType::rx ? "REQUEST_" : "RESPONSE_";
if (!rssName.empty())
{
m_ssName = std::string("SDV_SHARED_") + ssDirectionString + rssName;
m_ssSyncTx = std::string("SDV_TX_SYNC_") + ssDirectionString + rssName;
m_ssSyncRx = std::string("SDV_RX_SYNC_") + ssDirectionString + rssName;
}
else
{
uint64_t uiCnt = std::chrono::high_resolution_clock::now().time_since_epoch().count();
m_ssName = std::string("SDV_SHARED_") + ssDirectionString + std::to_string(uiCnt);
m_ssSyncTx = std::string("SDV_TX_SYNC_") + ssDirectionString + std::to_string(uiCnt);
m_ssSyncRx = std::string("SDV_RX_SYNC_") + ssDirectionString + std::to_string(uiCnt);
}
// Create a path
std::string ssNamePath = "/" + m_ssName;
// std::string ssSyncTxPath = "/" + m_ssSyncTx;
// std::string ssSyncRxPath = "/" + m_ssSyncRx;
// Unlink just in case the last server had crashed and the mapping still exists.
if (m_bServer)
{
shm_unlink((std::string("/") + m_ssName).c_str());
sem_unlink(m_ssSyncTx.c_str());
sem_unlink(m_ssSyncRx.c_str());
}
// Initialize the semaphores
if (bServer)
{
m_pSemaphoreTx = sem_open(m_ssSyncTx.c_str(), O_CREAT | O_EXCL, 0777 /*O_RDWR*/, 0);
if (!m_pSemaphoreTx || m_pSemaphoreTx == SEM_FAILED)
{
m_ssError = "Failed to create new semaphore " + m_ssSyncTx + ".";
return;
}
m_pSemaphoreRx = sem_open(m_ssSyncRx.c_str(), O_CREAT | O_EXCL, 0777 /*O_RDWR*/, 0);
if (!m_pSemaphoreRx || m_pSemaphoreRx == SEM_FAILED)
{
m_ssError = "Failed to create new semaphore " + m_ssSyncRx + ".";
return;
}
}
else
{
m_pSemaphoreTx = sem_open(m_ssSyncTx.c_str(), 0);
if (!m_pSemaphoreTx || m_pSemaphoreTx == SEM_FAILED)
{
m_ssError = "Failed to open existing semaphore " + m_ssSyncTx + ".";
return;
}
m_pSemaphoreRx = sem_open(m_ssSyncRx.c_str(), 0);
if (!m_pSemaphoreRx || m_pSemaphoreRx == SEM_FAILED)
{
m_ssError = "Failed to open existing semaphore " + m_ssSyncRx + ".";
return;
}
}
// Get shared memory file descriptor (NOT a file)
if (bServer)
{
m_iFileDescr = shm_open(ssNamePath.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (m_iFileDescr == -1)
{
m_ssError = "Failed to create the shared memory file descriptor " + ssNamePath + ".";
return;
}
// Extend shared memory object as by default it's initialized with size 0
int iResult = ftruncate(m_iFileDescr, m_uiSize);
if (iResult == -1)
{
m_ssError = "Failed to extend the shared memory.";
return;
}
}
else
{
m_iFileDescr = shm_open(ssNamePath.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
if (m_iFileDescr == -1)
{
m_ssError = "Failed to open the shared memory file descriptor " + ssNamePath + ".";
return;
}
// Get the size of the shared memory
struct stat sMemInfo{};
if (fstat(m_iFileDescr, &sMemInfo) == -1 || !sMemInfo.st_size)
{
m_ssError = "Failed to request the size of the shared memory file descriptor " + ssNamePath + ".";
return;
}
m_uiSize = static_cast<uint32_t>(sMemInfo.st_size);
}
// map shared memory to process address space
m_pBuffer = reinterpret_cast<uint8_t*>(mmap(NULL, m_uiSize, PROT_READ | PROT_WRITE, MAP_SHARED, m_iFileDescr, 0));
if (!m_pBuffer || m_pBuffer == MAP_FAILED)
{
m_ssError = "Failed to map the shared memory in process address space.";
return;
}
// If this is a server, the size causes the initialization. For a client, no initialization should take place (the server has
// done so already).
TAccessor::Attach(m_pBuffer, bServer ? m_uiSize : 0);
TRACE("Accessed shared memory for ", m_ssSyncTx, " and ", m_ssSyncRx, ".");
}
template <class TAccessor>
inline CSharedMemBuffer<TAccessor>::CSharedMemBuffer(const std::string& rssConnectionString)
{
if (rssConnectionString.empty())
{
m_ssError = "Missing connection string.";
return;
}
// Interpret the connection string
sdv::toml::CTOMLParser config(rssConnectionString);
// The connection string can contain multiple parameters. Search for the first parameters fitting the accessor direction
size_t nIndex = 0;
sdv::toml::CNodeCollection nodeConnectParamCollection = config.GetDirect("ConnectParam");
do
{
sdv::toml::CNodeCollection nodeConnectParam;
switch (nodeConnectParamCollection.GetType())
{
case sdv::toml::ENodeType::node_array:
if (nIndex >= nodeConnectParamCollection.GetCount()) break;
nodeConnectParam = nodeConnectParamCollection[nIndex];
break;
case sdv::toml::ENodeType::node_table:
if (nIndex > 0) break;
nodeConnectParam = nodeConnectParamCollection;
break;
default:
break;
}
if (nodeConnectParam.GetType() != sdv::toml::ENodeType::node_table) break;
nIndex++;
// Check for shared memory
if (nodeConnectParam.GetDirect("Type").GetValue() != "shared_mem") continue;
// Check the direction
if (nodeConnectParam.GetDirect("Direction").GetValue() !=
(TAccessor::GetAccessType() == EAccessType::rx ? "response" : "request"))
continue;
// Get the information
m_ssName = static_cast<std::string>(nodeConnectParam.GetDirect("Location").GetValue());
m_ssSyncTx = static_cast<std::string>(nodeConnectParam.GetDirect("SyncTx").GetValue());
m_ssSyncRx = static_cast<std::string>(nodeConnectParam.GetDirect("SyncRx").GetValue());
break;
} while (true);
if (m_ssName.empty() || m_ssSyncTx.empty() || m_ssSyncRx.empty())
{
m_ssError = "Incomplete connection information.";
return;
}
// Create a path
std::string ssPath = "/" + m_ssName;
// std::string ssSyncTxPath = "/" + m_ssSyncTx;
// std::string ssSyncRxPath = "/" + m_ssSyncRx;
// Get shared memory file descriptor (NOT a file)
m_iFileDescr = shm_open(ssPath.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
if (m_iFileDescr == -1)
{
m_ssError = "Failed to open the shared memory file descriptor " + ssPath + ".";
return;
}
// Get the size of the shared memory
struct stat sMemInfo{};
if (fstat(m_iFileDescr, &sMemInfo) == -1 || !sMemInfo.st_size)
{
m_ssError = "Failed to request the size of the shared memory file descriptor " + ssPath + ".";
return;
}
m_uiSize = static_cast<uint32_t>(sMemInfo.st_size);
// Map shared memory to process address space
m_pBuffer = reinterpret_cast<uint8_t*>(mmap(NULL, m_uiSize, PROT_READ | PROT_WRITE, MAP_SHARED, m_iFileDescr, 0));
if (!m_pBuffer || m_pBuffer == MAP_FAILED)
{
m_ssError = "Failed to map the shared memory in process address space.";
return;
}
// Initialize the semaphore
m_pSemaphoreTx = sem_open(m_ssSyncTx.c_str(), 0);
if (!m_pSemaphoreTx || m_pSemaphoreTx == SEM_FAILED)
{
m_ssError = "Failed to open existing semaphore " + m_ssSyncTx + ".";
return;
}
m_pSemaphoreRx = sem_open(m_ssSyncRx.c_str(), 0);
if (!m_pSemaphoreRx || m_pSemaphoreRx == SEM_FAILED)
{
m_ssError = "Failed to open existing semaphore " + m_ssSyncRx + ".";
return;
}
TAccessor::Attach(m_pBuffer);
TRACE("Opened shared memory for ", m_ssSyncTx, " and ", m_ssSyncRx, ".");
}
template <class TAccessor>
CSharedMemBuffer<TAccessor>::~CSharedMemBuffer()
{
// ATTENTION unmapping and unlinking will remove any connection to the shared memory within this process. When multiple
// accessors are used, this will invalidate them immediately.
if (m_pBuffer && m_pBuffer != MAP_FAILED)
munmap(m_pBuffer, m_uiSize);
if (m_bServer && !m_ssName.empty())
shm_unlink((std::string("/") + m_ssName).c_str());
if (m_bServer && m_pSemaphoreTx)
sem_unlink(m_ssSyncTx.c_str());
if (m_bServer && m_pSemaphoreRx)
sem_unlink(m_ssSyncRx.c_str());
}
template <class TAccessor>
void CSharedMemBuffer<TAccessor>::Detach()
{
m_uiSize = 0u;
m_iFileDescr = 0;
m_ssName.clear();
m_pBuffer = nullptr;
m_ssSyncTx.clear();
m_pSemaphoreTx = nullptr;
m_ssSyncRx.clear();
m_pSemaphoreRx = nullptr;
m_ssError.clear();
}
template <class TAccessor>
inline std::string CSharedMemBuffer<TAccessor>::GetConnectionString() const
{
// The connection string contains the TOML file for connecting to this shared memory.
std::stringstream sstream;
sstream << "[[ConnectParam]]" << std::endl;
sstream << "Type = \"shared_mem\"" << std::endl;
sstream << "Location = \"" << m_ssName << "\"" << std::endl;
sstream << "SyncTx = \"" << m_ssSyncTx << "\"" << std::endl;
sstream << "SyncRx = \"" << m_ssSyncRx << "\"" << std::endl;
// The target direction is the opposite of the direction of the accessor. Therefore, if the accessor uses an RX access type,
// the target uses an TX access type and should be configured as response, otherwise it is a request.
sstream << "Direction = \"" << (TAccessor::GetAccessType() == EAccessType::rx ? "request" : "response") << "\"" << std::endl;
return sstream.str();
}
template <class TAccessor>
inline void CSharedMemBuffer<TAccessor>::TriggerDataSend()
{
if (!m_pSemaphoreTx || m_pSemaphoreTx == SEM_FAILED)
return;
sem_post(m_pSemaphoreTx);
}
template <class TAccessor>
inline bool CSharedMemBuffer<TAccessor>::WaitForData(uint32_t uiTimeoutMs) const
{
if (!m_pSemaphoreTx || m_pSemaphoreTx == SEM_FAILED)
return false;
// Check whether there is data; if so, return true.
if (TAccessor::HasUnreadData())
return true;
// Get the time from the realtime clock
timespec sTimespec{};
if (clock_gettime(CLOCK_REALTIME, &sTimespec) == -1)
return false;
uint64_t uiTimeNs = sTimespec.tv_nsec + uiTimeoutMs * 1000000ull;
sTimespec.tv_nsec = uiTimeNs % 1000000000ull;
sTimespec.tv_sec += uiTimeNs / 1000000000ull;
// Wait for the semaphore
int iResult = sem_timedwait(m_pSemaphoreTx, &sTimespec);
if (iResult < 0) return false;
return true;
}
template <class TAccessor>
inline void CSharedMemBuffer<TAccessor>::TriggerDataReceive()
{
if (!m_pSemaphoreRx || m_pSemaphoreRx == SEM_FAILED)
return;
sem_post(m_pSemaphoreRx);
}
template <class TAccessor>
inline bool CSharedMemBuffer<TAccessor>::WaitForFreeSpace(uint32_t uiTimeoutMs) const
{
if (!m_pSemaphoreRx || m_pSemaphoreRx == SEM_FAILED)
return false;
// Get the time from the realtime clock
timespec sTimespec{};
if (clock_gettime(CLOCK_REALTIME, &sTimespec) == -1)
return false;
uint64_t uiTimeNs = sTimespec.tv_nsec + uiTimeoutMs * 1000000ull;
sTimespec.tv_nsec = uiTimeNs % 1000000000ull;
sTimespec.tv_sec += uiTimeNs / 1000000000ull;
// Wait for the semaphore
int iResult = sem_timedwait(m_pSemaphoreRx, &sTimespec);
if (iResult < 0)
return false;
if (TAccessor::Canceled())
return false;
return true;
}
#endif // !defined POSIX_SHARED_MEM_BUFFER_H

View File

@@ -0,0 +1,509 @@
#if !defined WINDOWS_SHARED_MEM_BUFFER_H && defined _WIN32
#define WINDOWS_SHARED_MEM_BUFFER_H
#include <cassert>
// Resolve conflict
#pragma push_macro("interface")
#undef interface
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <WinSock2.h>
#include <Windows.h>
// Resolve conflict
#pragma pop_macro("interface")
#ifdef GetClassInfo
#undef GetClassInfo
#endif
#include <support/string.h>
#include <support/toml.h>
#include "../../global/trace.h"
#include "mem_buffer_accessor.h"
/**
* @brief In-process memory buffer.
*/
template <class TAccessor>
class CSharedMemBuffer : public TAccessor
{
public:
/**
* @brief Default constructor
* @param[in] uiSize Optional size of the buffer. If zero, a default buffer size of 10k is configured.
* @param[in] rssName Optional name to be used for the connection. If empty, a random name is generated.
* @param[in] bServer Optional boolean indicating whether the connection is a server (true), which initiates the connection, or
* a client (false), which opens an existing connection.
*/
CSharedMemBuffer(uint32_t uiSize = 0, const std::string& rssName = std::string(), bool bServer = true);
/**
* @brief Connection constructor
* @param[in] rssConnectionString Reference to string with connection information.
*/
CSharedMemBuffer(const std::string& rssConnectionString);
/** No copy constructor */
CSharedMemBuffer(const CSharedMemBuffer&) = delete;
/** No move constructor */
CSharedMemBuffer(CSharedMemBuffer&&) = delete;
/**
* \brief Default destructor
*/
~CSharedMemBuffer();
/** No copy-assignment operator */
CSharedMemBuffer& operator=(const CSharedMemBuffer&) = delete;
/** No move-assignment operator */
CSharedMemBuffer& operator=(CSharedMemBuffer&&) = delete;
/**
* @brief Detach the buffer. Overload of CMemBufferAccessorBase::Detach.
* @details The detach function detaches the shared memory without deleting the memory. This keeps the memory alive for reuse.
*/
virtual void Detach() override;
/**
* @brief Return the connection string to connect to this shared memory.
* @return The connection string to connect to this buffer.
*/
std::string GetConnectionString() const;
/**
* @brief Trigger listener that a write operation was completed.
*/
void TriggerDataSend() override;
/**
* @brief Wait for a write operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
bool WaitForData(uint32_t uiTimeoutMs) const override;
/**
* @brief Trigger listener that a read operation was completed.
*/
void TriggerDataReceive() override;
/**
* @brief Wait for a read operation to be completed.
* @param[in] uiTimeoutMs The amount of time (in ms) to wait for a trigger.
* @return Returns 'true' when data was stored, 'false' when a timeout occurred.
*/
bool WaitForFreeSpace(uint32_t uiTimeoutMs) const override;
/**
* @brief Return the last reported error.
* @return Reference to the error string.
*/
const std::string& GetError() const { return m_ssError; }
/**
* @brief Get the size of the buffer.
* @return Returns the size of the buffer.
*/
uint32_t GetSize() const { return m_uiSize; }
/**
* @brief Get the name of the buffer.
* @return Reference to the string holding the name of the buffer.
*/
const std::string& GetName() const { return m_ssName; }
private:
uint32_t m_uiSize = 0u; ///< Size of the shared memory buffer.
HANDLE m_hMapFile = INVALID_HANDLE_VALUE; ///< Handle to the shared memory buffer.
std::string m_ssName; ///< Name of the shared memory.
uint8_t* m_pBuffer = nullptr; ///< Pointer to the mapped buffer.
std::string m_ssSyncTx; ///< Name of the signalling event.
HANDLE m_hSignalTx = INVALID_HANDLE_VALUE; ///< Handle to the signalling event.
std::string m_ssSyncRx; ///< Name of the signalling event.
HANDLE m_hSignalRx = INVALID_HANDLE_VALUE; ///< Handle to the signalling event.
std::string m_ssError; ///< The last reported error.
};
/**
* @brief Shared memory buffer used for reading.
*/
using CSharedMemBufferRx = CSharedMemBuffer<CMemBufferAccessorRx>;
/**
* @brief Shared memory buffer used for writing.
*/
using CSharedMemBufferTx = CSharedMemBuffer<CMemBufferAccessorTx>;
template <class TAccessor>
inline CSharedMemBuffer<TAccessor>::CSharedMemBuffer(uint32_t uiSize /*= 0*/, const std::string& rssName /*= std::string()*/,
bool bServer /*= true*/) : m_uiSize(bServer ? (uiSize ? uiSize : 128 * 1024) : 0)
{
// Create a name to be used in the connection string
std::string ssDirectionString;
if (bServer)
ssDirectionString = TAccessor::GetAccessType() == EAccessType::rx ? "RESPONSE_" : "REQUEST_";
else
ssDirectionString = TAccessor::GetAccessType() == EAccessType::rx ? "REQUEST_" : "RESPONSE_";
if (!rssName.empty())
{
m_ssName = std::string("SDV_SHARED_") + ssDirectionString + rssName;
m_ssSyncTx = std::string("SDV_TX_SYNC_") + ssDirectionString + rssName;
m_ssSyncRx = std::string("SDV_RX_SYNC_") + ssDirectionString + rssName;
}
else
{
uint64_t uiCnt = std::chrono::high_resolution_clock::now().time_since_epoch().count();
m_ssName = std::string("SDV_SHARED_") + ssDirectionString + std::to_string(uiCnt);
m_ssSyncTx = std::string("SDV_TX_SYNC_") + ssDirectionString + std::to_string(uiCnt);
m_ssSyncRx = std::string("SDV_RX_SYNC_") + ssDirectionString + std::to_string(uiCnt);
}
// Create a path
std::string ssNamePath = /*"Global\\" +*/ m_ssName;
std::string ssSyncTxPath = /*"Global\\" +*/ m_ssSyncTx;
std::string ssSyncRxPath = /*"Global\\" +*/ m_ssSyncRx;
auto fnReportWin32Error = [this]()
{
TCHAR* szMsg = nullptr;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
GetLastError(),
MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US),
(LPTSTR) &szMsg,
0,
NULL);
m_ssError += sdv::MakeUtf8String(szMsg);
LocalFree(szMsg);
};
auto fnCloseAll = [this]()
{
if (m_pBuffer) UnmapViewOfFile(m_pBuffer);
if (m_hMapFile && m_hMapFile != INVALID_HANDLE_VALUE) CloseHandle(m_hMapFile);
if (m_hSignalTx && m_hSignalTx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalTx);
if (m_hSignalRx && m_hSignalRx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalRx);
m_pBuffer = 0;
m_hMapFile = INVALID_HANDLE_VALUE;
m_hSignalTx = INVALID_HANDLE_VALUE;
m_hSignalRx = INVALID_HANDLE_VALUE;
};
// Create sync event object
m_hSignalTx = CreateEventA(nullptr, FALSE, FALSE, ssSyncTxPath.c_str());
if (!m_hSignalTx || m_hSignalTx == INVALID_HANDLE_VALUE)
{
m_ssError = "Failed to create event " + ssSyncTxPath + ": ";
fnReportWin32Error();
fnCloseAll();
return;
}
// Create sync event object
m_hSignalRx = CreateEventA(nullptr, FALSE, FALSE, ssSyncRxPath.c_str());
if (!m_hSignalRx || m_hSignalRx == INVALID_HANDLE_VALUE)
{
m_ssError = "Failed to create event " + ssSyncRxPath + ": ";
fnReportWin32Error();
fnCloseAll();
return;
}
if (bServer)
{
// Create the file mapping object
m_hMapFile = CreateFileMappingA(INVALID_HANDLE_VALUE, // use paging file
NULL, // default security
PAGE_READWRITE, // read/write access
0, // maximum object size (high-order DWORD)
m_uiSize, // maximum object size (low-order DWORD)
ssNamePath.c_str()); // name of mapping object
} else
{
// Open the file mapping object
m_hMapFile = OpenFileMappingA(FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // do not inherit the name
ssNamePath.c_str()); // name of mapping object
}
if (!m_hMapFile || m_hMapFile == INVALID_HANDLE_VALUE)
{
m_ssError = "Failed to access file mapping " + ssNamePath + ": ";
fnReportWin32Error();
fnCloseAll();
return;
}
// Map the file into memory
m_pBuffer = reinterpret_cast<uint8_t*>(MapViewOfFile(m_hMapFile, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
0, // Offset high
0, // Offset low
m_uiSize)); // Amount of bytes
if (!m_pBuffer)
{
m_ssError = "Failed to create file mapping view: ";
fnReportWin32Error();
fnCloseAll();
return;
}
if (!m_uiSize)
{
// Request the size of the mapping
MEMORY_BASIC_INFORMATION sMemInfo{};
auto nRet = VirtualQuery(m_pBuffer, &sMemInfo, sizeof(sMemInfo));
if (nRet != sizeof(sMemInfo) || !sMemInfo.RegionSize)
{
m_ssError = "Failed to request mapping view size: ";
fnReportWin32Error();
fnCloseAll();
return;
}
m_uiSize = static_cast<uint32_t>(sMemInfo.RegionSize);
}
// If this is a server, the size causes the initialization. For a client, no initialization should take place (the server has
// done so already).
TAccessor::Attach(m_pBuffer, bServer ? m_uiSize : 0);
TRACE("Accessed shared memory for ", m_ssSyncTx, " and ", m_ssSyncRx, ".");
}
template <class TAccessor>
inline CSharedMemBuffer<TAccessor>::CSharedMemBuffer(const std::string& rssConnectionString)
{
if (rssConnectionString.empty())
{
m_ssError = "Missing connection string.";
return;
}
// Interpret the connection string
sdv::toml::CTOMLParser config(rssConnectionString);
// The connection string can contain multiple parameters. Search for the first parameters fitting the accessor direction
size_t nIndex = 0;
sdv::toml::CNodeCollection nodeConnectParamCollection = config.GetDirect("ConnectParam");
do
{
sdv::toml::CNodeCollection nodeConnectParam;
switch (nodeConnectParamCollection.GetType())
{
case sdv::toml::ENodeType::node_array:
if (nIndex >= nodeConnectParamCollection.GetCount()) break;
nodeConnectParam = nodeConnectParamCollection[nIndex];
break;
case sdv::toml::ENodeType::node_table:
if (nIndex > 0) break;
nodeConnectParam = nodeConnectParamCollection;
break;
default:
break;
}
if (nodeConnectParam.GetType() != sdv::toml::ENodeType::node_table) break;
nIndex++;
// Check for shared memory
if (nodeConnectParam.GetDirect("Type").GetValue() != "shared_mem") continue;
// Check the direction
if (nodeConnectParam.GetDirect("Direction").GetValue() !=
(TAccessor::GetAccessType() == EAccessType::rx ? "response" : "request"))
continue;
// Get the information
m_ssName = static_cast<std::string>(nodeConnectParam.GetDirect("Location").GetValue());
m_ssSyncTx = static_cast<std::string>(nodeConnectParam.GetDirect("SyncTx").GetValue());
m_ssSyncRx = static_cast<std::string>(nodeConnectParam.GetDirect("SyncRx").GetValue());
break;
} while (true);
if (m_ssName.empty() || m_ssSyncTx.empty() || m_ssSyncRx.empty())
{
m_ssError = "Incomplete connection information.";
return;
}
// Create a path
std::string ssPath = /*"Global\\" +*/ m_ssName;
std::string ssSyncTxPath = /*"Global\\" +*/ m_ssSyncTx;
std::string ssSyncRxPath = /*"Global\\" +*/ m_ssSyncRx;
auto fnReportWin32Error = [this]()
{
TCHAR* szMsg = nullptr;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
GetLastError(),
MAKELANGID(LANG_ENGLISH, SUBLANG_ENGLISH_US),
(LPTSTR)&szMsg,
0,
NULL);
m_ssError += sdv::MakeUtf8String(szMsg);
LocalFree(szMsg);
};
auto fnCloseAll = [this]()
{
if (m_pBuffer) UnmapViewOfFile(m_pBuffer);
if (m_hMapFile && m_hMapFile != INVALID_HANDLE_VALUE) CloseHandle(m_hMapFile);
if (m_hSignalTx && m_hSignalTx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalTx);
if (m_hSignalRx && m_hSignalRx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalRx);
m_pBuffer = 0;
m_hMapFile = INVALID_HANDLE_VALUE;
m_hSignalTx = INVALID_HANDLE_VALUE;
m_hSignalRx = INVALID_HANDLE_VALUE;
};
// Create TX sync event object
m_hSignalTx = CreateEventA(nullptr, FALSE, FALSE, ssSyncTxPath.c_str());
if (!m_hSignalTx || m_hSignalTx == INVALID_HANDLE_VALUE)
{
m_ssError = "Failed to create event " + ssSyncTxPath + ": ";
fnReportWin32Error();
fnCloseAll();
return;
}
// Create RX sync event object
m_hSignalRx = CreateEventA(nullptr, FALSE, FALSE, ssSyncRxPath.c_str());
if (!m_hSignalRx || m_hSignalRx == INVALID_HANDLE_VALUE)
{
m_ssError = "Failed to create event " + ssSyncRxPath + ": ";
fnReportWin32Error();
fnCloseAll();
return;
}
// Open the file mapping object
m_hMapFile = OpenFileMappingA(FILE_MAP_ALL_ACCESS, // read/write access
FALSE, // do not inherit the name
ssPath.c_str()); // name of mapping object
if (!m_hMapFile || m_hMapFile == INVALID_HANDLE_VALUE)
{
m_ssError = "Failed to open file mapping " + ssPath + ": ";
fnReportWin32Error();
fnCloseAll();
return;
}
// Map the file into memory
m_pBuffer = reinterpret_cast<uint8_t*>(MapViewOfFile(m_hMapFile, // handle to map object
FILE_MAP_ALL_ACCESS, // read/write permission
0, // Offset high
0, // Offset low
0)); // Amount of bytes (all in this case)
if (!m_pBuffer)
{
m_ssError = "Failed to create file mapping view: ";
fnReportWin32Error();
fnCloseAll();
return;
}
// Request the size of the mapping
MEMORY_BASIC_INFORMATION sMemInfo{};
auto nRet = VirtualQuery(m_pBuffer, &sMemInfo, sizeof(sMemInfo));
if (nRet != sizeof(sMemInfo) || !sMemInfo.RegionSize)
{
m_ssError = "Failed to request mapping view size: ";
fnReportWin32Error();
fnCloseAll();
return;
}
m_uiSize = static_cast<uint32_t>(sMemInfo.RegionSize);
TAccessor::Attach(m_pBuffer);
TRACE("Opened shared memory for ", m_ssSyncTx, " and ", m_ssSyncRx, ".");
}
template <class TAccessor>
CSharedMemBuffer<TAccessor>::~CSharedMemBuffer()
{
if (m_pBuffer) UnmapViewOfFile(m_pBuffer);
if (m_hMapFile && m_hMapFile != INVALID_HANDLE_VALUE) CloseHandle(m_hMapFile);
if (m_hSignalTx && m_hSignalTx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalTx);
if (m_hSignalRx && m_hSignalRx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalRx);
}
template <class TAccessor>
void CSharedMemBuffer<TAccessor>::Detach()
{
if (m_pBuffer) UnmapViewOfFile(m_pBuffer);
if (m_hMapFile && m_hMapFile != INVALID_HANDLE_VALUE) CloseHandle(m_hMapFile);
if (m_hSignalTx && m_hSignalTx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalTx);
if (m_hSignalRx && m_hSignalRx != INVALID_HANDLE_VALUE) CloseHandle(m_hSignalRx);
m_uiSize = 0u;
m_hMapFile = INVALID_HANDLE_VALUE;
m_ssName.clear();
m_pBuffer = nullptr;
m_ssSyncTx.clear();
m_hSignalTx = INVALID_HANDLE_VALUE;
m_ssSyncRx.clear();
m_hSignalRx = INVALID_HANDLE_VALUE;
m_ssError.clear();
}
template <class TAccessor>
inline std::string CSharedMemBuffer<TAccessor>::GetConnectionString() const
{
// The connection string contains the TOML file for connecting to this shared memory.
std::stringstream sstream;
sstream << "[[ConnectParam]]" << std::endl;
sstream << "Type = \"shared_mem\"" << std::endl;
sstream << "Location = \"" << m_ssName << "\"" << std::endl;
sstream << "SyncTx = \"" << m_ssSyncTx << "\"" << std::endl;
sstream << "SyncRx = \"" << m_ssSyncRx << "\"" << std::endl;
// The target direction is the opposite of the direction of the accessor. Therefore, if the accessor uses an RX access type,
// the target uses an TX access type and should be configured as response, otherwise it is a request.
sstream << "Direction = \"" << (TAccessor::GetAccessType() == EAccessType::rx ? "request" : "response") << "\"" << std::endl;
return sstream.str();
}
template <class TAccessor>
inline void CSharedMemBuffer<TAccessor>::TriggerDataSend()
{
if (!m_hSignalTx || m_hSignalTx == INVALID_HANDLE_VALUE) return;
SetEvent(m_hSignalTx);
}
template <class TAccessor>
inline bool CSharedMemBuffer<TAccessor>::WaitForData(uint32_t uiTimeoutMs) const
{
if (!m_hSignalTx || m_hSignalTx == INVALID_HANDLE_VALUE) return false;
// Check whether there is data; if so, return true.
if (TAccessor::HasUnreadData())
return true;
return WaitForSingleObject(m_hSignalTx, uiTimeoutMs) == WAIT_OBJECT_0;
}
template <class TAccessor>
inline void CSharedMemBuffer<TAccessor>::TriggerDataReceive()
{
if (!m_hSignalRx || m_hSignalRx == INVALID_HANDLE_VALUE) return;
SetEvent(m_hSignalRx);
}
template <class TAccessor>
inline bool CSharedMemBuffer<TAccessor>::WaitForFreeSpace(uint32_t uiTimeoutMs) const
{
if (!m_hSignalRx || m_hSignalRx == INVALID_HANDLE_VALUE) return false;
DWORD dwResult = WaitForSingleObject(m_hSignalRx, uiTimeoutMs);
if (TAccessor::Canceled())
return false;
return dwResult == WAIT_OBJECT_0;
}
#endif // !defined WINDOWS_SHARED_MEM_BUFFER_H

View File

@@ -0,0 +1,192 @@
#include "watchdog.h"
#include "connection.h"
#include <vector>
#include "../../global/trace.h"
#ifdef __unix__
#include <signal.h>
#endif
CWatchDog::CWatchDog()
{
m_threadScheduledConnectionDestructions = std::thread(&CWatchDog::HandleScheduledConnectionDestructions, this);
}
CWatchDog::~CWatchDog()
{
Clear();
}
void CWatchDog::Clear()
{
sdv::process::IProcessLifetime* pMonitorMgnt = sdv::core::GetObject<sdv::process::IProcessLifetime>("ProcessControlService");
if (pMonitorMgnt)
{
std::unique_lock<std::mutex> lockMonitors(m_mtxMonitors);
for (const auto& rvtProcessMonitor : m_mapProcessMonitors)
pMonitorMgnt->UnregisterMonitor(rvtProcessMonitor.second);
m_mapProcessMonitors.clear();
}
// Shift the connections to a local variable to be able to delete the connections without being in the lock region.
std::unique_lock<std::mutex> lockConnections(m_mtxConnections);
auto mapConnections = std::move(m_mapConnections);
lockConnections.unlock();
// Remove the connections.
mapConnections.clear();
// Finalize the asnyhronous destructions
m_bShutdown = true;
if (m_threadScheduledConnectionDestructions.joinable())
m_threadScheduledConnectionDestructions.join();
}
void CWatchDog::AddConnection(const std::shared_ptr<CConnection>& rptrConnection)
{
if (!rptrConnection) return;
#if ENABLE_REPORTING > 0
TRACE("Registering connection ", rptrConnection->IsServer() ? "server" : "client");
#endif
std::unique_lock<std::mutex> lock(m_mtxConnections);
m_mapConnections.try_emplace(rptrConnection.get(), rptrConnection);
}
void CWatchDog::RemoveConnection(CConnection* pConnection, bool bAsync)
{
#if ENABLE_REPORTING > 0
if (bAsync)
TRACE("Scheduling destruction connection ", pConnection->IsServer() ? "server" : "client");
else
TRACE("Destroying connection ", pConnection->IsServer() ? "server" : "client");
#endif
// Find the connection and move it in a local variable to release it when outside the lock region.
std::unique_lock<std::mutex> lockConnections(m_mtxConnections);
auto itConnection = m_mapConnections.find(pConnection);
if (itConnection == m_mapConnections.end()) return;
auto ptrConnection = std::move(itConnection->second);
m_mapConnections.erase(itConnection);
if (bAsync)
{
// Shift the connection into the queue.
m_queueScheduledConnectionDestructions.push(std::move(ptrConnection));
m_cvTriggerConnectionDestruction.notify_all();
}
lockConnections.unlock();
// Release the connection
ptrConnection.reset();
}
void CWatchDog::AddMonitor(sdv::process::TProcessID tProcessID, CConnection* pConnection)
{
if (!tProcessID || !pConnection) return;
sdv::process::IProcessLifetime* pMonitorMgnt = sdv::core::GetObject<sdv::process::IProcessLifetime>("ProcessControlService");
if (!pMonitorMgnt) return;
std::unique_lock<std::mutex> lock(m_mtxMonitors);
#if ENABLE_REPORTING > 0
TRACE("Registering ", pConnection->IsServer() ? "server" : "client", " watchdog monitor for PID#", tProcessID);
#endif
// Add the process monitor if not already assigned
m_mapProcessMonitors.try_emplace(tProcessID, pMonitorMgnt->RegisterMonitor(tProcessID, this));
#if ENABLE_REPORTING > 0
TRACE("Registering watchdog for PID#", tProcessID);
#endif
// Add a monitor entry
m_mapMonitors.insert(std::make_pair(tProcessID, pConnection->shared_from_this()));
}
void CWatchDog::RemoveMonitor(const CConnection* pConnection)
{
if (!pConnection) return;
std::unique_lock<std::mutex> lock(m_mtxMonitors);
#if ENABLE_REPORTING > 0
TRACE("Removing ", pConnection->IsServer() ? "server" : "client", " watchdog monitor");
#endif
// Earse all monitors for the provided connection
auto itMonitor = m_mapMonitors.begin();
while (itMonitor != m_mapMonitors.end())
{
std::shared_ptr<CConnection> ptrConnection = itMonitor->second.lock();
if (ptrConnection.get() == pConnection)
itMonitor = m_mapMonitors.erase(itMonitor);
else
itMonitor++;
}
}
void CWatchDog::HandleScheduledConnectionDestructions()
{
while (!m_bShutdown)
{
std::unique_lock<std::mutex> lock(m_mtxMonitors);
m_cvTriggerConnectionDestruction.wait_for(lock, std::chrono::milliseconds(100));
while (m_queueScheduledConnectionDestructions.size())
{
std::shared_ptr<CConnection> ptrConnection = std::move(m_queueScheduledConnectionDestructions.front());
m_queueScheduledConnectionDestructions.pop();
lock.unlock();
#if ENABLE_REPORTING > 0
TRACE("Final destroying connection ", ptrConnection->IsServer() ? "server" : "client");
#endif
ptrConnection.reset();
lock.lock();
}
}
}
void CWatchDog::ProcessTerminated(/*in*/ sdv::process::TProcessID tProcessID, /*in*/ int64_t /*iRetValue*/)
{
std::unique_lock<std::mutex> lock(m_mtxMonitors);
// Unregister the monitor
sdv::process::IProcessLifetime* pMonitorMgnt = sdv::core::GetObject<sdv::process::IProcessLifetime>("ProcessControlService");
if (!pMonitorMgnt) return;
auto itProcessMonitor = m_mapProcessMonitors.find(tProcessID);
if (itProcessMonitor == m_mapProcessMonitors.end()) return;
pMonitorMgnt->UnregisterMonitor(itProcessMonitor->second);
m_mapProcessMonitors.erase(itProcessMonitor);
// Find the monitor in the map, remove it and add the connection to a to-be-disconnected vector.
std::vector<std::shared_ptr<CConnection>> vecDisconnectedConnections;
auto itMonitor = m_mapMonitors.find(tProcessID);
while (itMonitor != m_mapMonitors.end() && itMonitor->first == tProcessID)
{
#if ENABLE_REPORTING > 0
TRACE("Removing watchdog for PID#", tProcessID);
#endif
// Add the connection to the vector (if the connection was not removed before).
auto ptrConnection = itMonitor->second.lock();
if (ptrConnection) vecDisconnectedConnections.push_back(ptrConnection);
// Remove the monitor
itMonitor = m_mapMonitors.erase(itMonitor);
}
lock.unlock();
// Inform the connection about the removed process.
for (auto& rptrConnection : vecDisconnectedConnections)
{
rptrConnection->SetStatus(sdv::ipc::EConnectStatus::disconnected_forced);
#if ENABLE_REPORTING > 0
TRACE("Forced disconnection for PID#", tProcessID);
#endif
}
}

View File

@@ -0,0 +1,120 @@
#ifndef WATCH_DOG_H
#define WATCH_DOG_H
#ifdef _WIN32
// Resolve conflict
#pragma push_macro("interface")
#undef interface
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <WinSock2.h>
#include <Windows.h>
// Resolve conflict
#pragma pop_macro("interface")
#ifdef GetClassInfo
#undef GetClassInfo
#endif
#endif
#include <interfaces/process.h>
#include <support/interface_ptr.h>
#include <mutex>
#include <map>
#include <memory>
#include <set>
#include <thread>
#include <queue>
#include <condition_variable>
// Forward declaration
class CConnection;
/**
* @brief The watch dog monitors the processes and connections.
* @details The watch dog keeps track of the processes and disconnects when the process the connection is with doesn't exist any
* more. This method is has advantages to using time-outs. If a process is being debugged and a breakpoint is triggered, all
* threads within the process are paused, which could cause a timeout by the calling process waiting for an answer. By monitoring
* the process existence, a pause doesn't cause any error. If the process crashes, it is being removed by the OS, causing the
* monitor to detect this and the connection being terminated. Disadvantage is, that if the process is in a deadlock, this cannot
* be detected by this method.
*/
class CWatchDog : public sdv::IInterfaceAccess, public sdv::process::IProcessLifetimeCallback
{
public:
/**
* @brief Constructor
*/
CWatchDog();
/**
* @brief Destructor
*/
~CWatchDog();
// Interface map
BEGIN_SDV_INTERFACE_MAP()
SDV_INTERFACE_ENTRY(sdv::process::IProcessLifetimeCallback)
END_SDV_INTERFACE_MAP()
/**
* @brief Unregister all remaining monitors.
*/
void Clear();
/**
* @brief Add a connection object used for monitoring.
* @param[in] rptrConnection Reference to the connection shared pointer.
*/
void AddConnection(const std::shared_ptr<CConnection>& rptrConnection);
/**
* @brief Remove a connection.
* @param[in] pConnection The connection pointer to remove the monitor for.
* @param[in] bAsync Use asnyhronous removal (use when the removal was triggered by a thread owned by the connection).
*/
void RemoveConnection(CConnection* pConnection, bool bAsync);
/**
* @brief Add a process monitor with the connection object to information.
* @param[in] tProcessID Process ID of the process to monitor.
* @param[in] pConnection Pointer to the connection to inform when the process is killed.
*/
void AddMonitor(sdv::process::TProcessID tProcessID, CConnection* pConnection);
/**
* @brief Remove a monitor.
* @param[in] pConnection The connection pointer to remove the monitor for.
*/
void RemoveMonitor(const CConnection* pConnection);
private:
/**
* @brief Thread function to destroy scheduled connections for destruction.
*/
void HandleScheduledConnectionDestructions();
/**
* @brief Called when the process was terminated. Overload of sdv::process::IProcessLifetimeCallback::ProcessTerminated.
* @remarks The process return value is not always valid. The validity depends on the support of the underlying system.
* @param[in] tProcessID The process ID of the process being terminated.
* @param[in] iRetValue Process return value or 0 when not supported.
*/
virtual void ProcessTerminated(/*in*/ sdv::process::TProcessID tProcessID, /*in*/ int64_t iRetValue) override;
std::mutex m_mtxConnections; ///< Protect the map access.
std::map<CConnection*, std::shared_ptr<CConnection>> m_mapConnections; ///< Connection map.
std::mutex m_mtxMonitors; ///< Protect the map access.
std::multimap<sdv::process::TProcessID, std::weak_ptr<CConnection>> m_mapMonitors; ///< Process monitor.
std::map<sdv::process::TProcessID, uint32_t> m_mapProcessMonitors; ///< Map of process monitor cookies.
std::condition_variable m_cvTriggerConnectionDestruction; ///< Condition variable used to trigger when a
///< connection is scheduled for destruction.
std::queue<std::shared_ptr<CConnection>> m_queueScheduledConnectionDestructions; ///< Scheduled connection for destruction.
std::thread m_threadScheduledConnectionDestructions; ///< Thread processing the scheduled destructions.
bool m_bShutdown = false; ///< Set when shutting down the watchdog
};
#endif // !defined WATCH_DOG_H