Simple and fast communication library based on ZeroMQ.
It provides few easy to use C++ classes in order to connect different parties together. There are also C++ bindings to some ZMQ sockets and features, which are used by the library components themselves. Finally there some system-level software components which can be accessed by gRPC calls, in order to hide ZMQ details and to be able to develop microservices in any language supported by gRPC.
Table of Contents
- ZMQ C++ bindings with RAII design and exception safe code.
- Leverage of ZMQ thread safe sockets, like RADIO/DISH, CLIENT/SERVER.
- Multipart messages can be used with ZMQ thread safe sockets.
- Polling of ZMQ sockets made easy: can integrate with C++ range based loops, easy open/close sockets, etc..
- Multicore friendly, due to the usage of ZMQ and Boost::ASIO.
- Logging aware library: levels, customization and performance.
- Time elapsed measurement for timeouts.
- Endianess management for integer types.
- Timers can be polled like they were sockets.
- Microservices mini framework, based on workers and brokers.
- Connection management between worker and broker.
- Redesigned and extended ZMQ Clone Pattern.
- Late joining management in publish/subscribe patterns.
- Retry and keep alive to check for liveness.
- Eventually consistent and coherent distributed system, for high availability.
- Universally unique identifier support (UUID).
- Reliability by means of supported redundant connections.
(TODO: add references to classes where possibile.)
This library provides two main components which are worker and broker. The communication between them is based on ZMQ thread-safe sockets Radio/Dish and Client/Server, as shown in the following diagram:
Dispatch: A worker dispatches a message, i.e. a topic, to any connected broker. Every message is marked with a sequence number, at worker's side. Depending on the underlying ZMQ socket transport, a topic might be dispatched to more than a single broker.
ZMQ sockets: Radio/Dish.
Delivery: A broker delivers data to any connected workers, whenever it receives a topic. A broker filters out every message marked with a sequence number that is less or equal to the last seen sequence number, for every single worker. Every worker does such filtering as well. Every received topic is locally stored, at broker side.
ZMQ sockets: Radio/Dish.
Synchronize: A worker synchronizes with lastest broker's snapshot. A broker replies with its snapshot, before serving any other dispatched topics. Returned topics are the only ones marked as state, instead event ones are just delivered.
ZMQ sockets: Client/Server.
Upon instantiation of worker and broker classes, an unique UUID is assigned in order to distinguish them. Every components will connect/bind to the speicified ZMQ endopoints. After starting both components, an asynchronous connection is set up. Data will begin to flow as soon as the connection gets ready. Auto reconnection to the broker is automatically performed by the worker, in case the former disappears. At any time, a worker can synchronize with its connected broker.
Please see the examples folder for some actual code.
Some examples of possible configurations, by putting pieces together.
Sharing of State among Services
Every service can export and distribute its data to others and a central broker can manage the accesses to shared state.
Master and Replica
Whenever a service publishes its data, it can dispatch it to both a master and a replica. Switchover between master and replica, or activation of services, are beyond the scope of this library.
Redundancy for Cabling
It's possible to configure multiple paths from worker to broker, using more than just one network. Messages will be filtered out by both worker and broker, using sequence numbers.
Communication between workers and broker relys on a continuous flow of probes, at configurable specific time intervals. Every worker actively checks whether its delivery and dispatch sockets are connected to the broker. Conversely, snapshot socket(s) is checked on demand, that is when a synchronization action is performed.
Dispatch & Delivery
The following state machine represents how a worker checks for liveness of the dispatch and delivery round trip path. A worker shall send a probe to the dispatch socket, which shall be received later through the delivery socket. This dispatch and delivery path might potentially pass through multiple brokers, despites it requires a peer-to-peer communication among brokers.
The state machine has three states:
- Sockets are disconnected and connection is closed.
- Announcements are continuously dispatched, at every retry interval.
- Sockets are reconnected every timeout interval, in case of timeout.
- A transition to Stable happens if a reply is delivered.
- Keepalives are dispatched, at every delivered reply.
- A transition back to Trying happens in case of timeout.
TBD: Matching between dispached vs delivered probe to be implemented.
The operation of snapshot download involves some steps. In case just one endpoint is used to connect to a broker, then synchronization retries to until it completely succeeds. When multiple endpoints are configured, then for every download failure, then next endpoint is selected, until it finally succeeds or the maximum number of retry count is reached. Such endpoints might potentially belong to different brokers. A detailed state machine chart is shown below:
TDB: Handling of multiple snapshot endpoints to be implemented.
The gRPC interface in defined is the worker.proto specification file, which requires protobuf at least version 3.15.8 and gRPC libraries at least version 1.37.1. Such libraries are not provided by this project, but they are dynamically linked at system level.
The gRPC service mainly provides the following calls, which all are thread-safe:
- SetEndpoints: Changes the fuurin worker endpoints.
- SetSubscriptions: Registers the topic names to sync with fuurin broker.
- WaitForEvent: Waits for any incoming events. In case of multiple clients, events are multiplexed, that is they are all received by every client.
- Start: Starts the connection with broker. Following subsequent events are expected:
- Started: upon worker start.
- Online: upon connection with broker.
- Stop: Stops the connection with broker. Following subsequent events are expected:
- Offline: upon disconnection with broker.
- Stopped: upon worker stop.
- Dispatch: Sends a topic to the broker. Topic data is always treated as generic bytes. Following subsequent events are expected:
- Delivery: for every subscribed topic.
- Sync: Syncs with the broker. Following subsequent events are expected:
- SyncDownloadOn: upon sync start.
- SyncRequest: when sync request was sent to the broker.
- SyncBegin: when sync reply from broker was received.
- SyncElement: for every subscribed topic.
- SyncSuccess: in case of sync success.
- SyncError: in case of sync error.
- SyncDownloadOff: upon sync stop.
The fuurin library is released under the terms of Mozilla Public License 2.0. Please see the attached LICENSE file for further details.
LGPL v3 plus a static linking exception
Boost Software License 1.0
Apache License 2.0
How to build the library
CMake is used as project management tool to both build and test this library.
$> mkdir build $> cd build $> cmake /path/to/fuurin/folder $> make $> make install
Library is installed on Unix systems to the default path
though a different PREFIX can be specified.
For example library can be installed to the path
by setting the
$> make DESTDIR=/home/user install
How to make a debian package
Debian packages are generated through the following command:
$> cmake -D BUILD_DEB_PACKAGE=ON /path/to/fuurin/folder $> make $> (umask 022 && fakeroot make package)
Two packages are generated:
Development package depends on runtime, so the latter one must be installed first in order install the library headers.
How to run tests
In order to run tests, they must be enabled first:
$> cmake -D BUILD_TESTS=1 /path/to/fuurin/folder $> make $> ctest -v
How to build examples
In order to run examples, they must be enabled first:
$> cmake -D BUILD_EXAMPLES=1 /path/to/fuurin/folder $> make
How to enable sanitizers
Sanitizers can enabled with some cmake options:
- Address Sanitizer: cmake -D ENABLE_ASAN=ON ...
- Thread Sanitezer: cmake -D ENABLE_TSAN=ON ...
- Memory Sanitezer: cmake -D ENABLE_MSAN=ON ...
- Undefined Behavior Sanitizer: cmake -D ENABLE_UBSAN=ON ...
How to check coverage
Coverage can be obtained by specifying CMake option
$> cmake -D BUILD_TESTS=1 -D ENABLE_COVERAGE=1 /path/to/fuurin/folder $> make $> ctest -v $> lcov --directory . --capture --output-file coverage.info $> genhtml coverage.info --output-directory coverage
How to compile with clang
$> cmake -D CMAKE_C_COMPILER=/usr/bin/clang -D CMAKE_CXX_COMPILER=/usr/bin/clang++ /path/to/fuurin/folder $> make
External libraries used by fuurin are integrated through a git subtree approach. For example, Boost unit test framework may be added in this manual way:
- Create an orphan branch where to store the external library
$> cd /path/to/fuurin/folder $> git checkout --orphan vendor/boost $> git reset $> git ls-files -o | xargs rm
- Extract a subset of Boost
$> cd /tmp $> tar xf boost_1_65_1.tar.bz2 $> cd boost_1_65_1 $> ./bootstrap.sh $> ./b2 tools/bcp $> ./bin.v2/tools/bcp/.../bcp \ LICENSE_1_0.txt \ boost/test/unit_test.hpp \ boost/test/included/unit_test.hpp \ boost/test/data/test_case.hpp \ libs/test/test/test-organization-ts/datasets-test/* \ libs/test/example/* \ boost/mpl/list.hpp \ boost/scope_exit.hpp \ boost/typeof/incr_registration_group.hpp \ boost/uuid/uuid.hpp \ boost/uuid/uuid_generators.hpp \ boost/uuid/uuid_io.hpp \ boost/uuid/uuid_hash.hpp \ boost/asio.hpp \ /path/to/fuurin/folder
- Add Boost files to the orphan branch
$> cd /path/to/fuurin/folder $> git add boost/ libs/ $> git ci -m"boost: import test framework version 1.65.1"
- Checkout the Boost vendor branch into a subfolder of master branch
$> git checkout master $> git merge --allow-unrelated-histories -s ours --no-commit vendor/boost $> git read-tree -u --prefix=vendor/boost vendor/boost $> git commit
- Update the Boost library and merge back into the master branch
$> ## folder must be free of untracked files $> git checkout vendor/boost -- $> ## optional clear all tracked files $> git ls-files | xargs git rm $> ## update library by adding new untracked files $> git ls-files -o | xargs git add $> git ci -m"boost: library updated" $> git checkout master $> git merge -s subtree -X subtree=vendor/boost vendor/boost
- Contribution: TBD
- Writing tests: TBD
- Code review: TDB
- Pull request: TDB
- Coding standard: TDB