# OpenDHT源码解析
### 简介
OpenDHT具有以下特性:
- 以分布式网络的形式共享key->value数据存储
- 同时支持IPV4和IPV6
- value可以是任意二级制,并且最多64KB,key最长可达160位
- 相同的key可以对应不同的value,这些value可以被一个64位的keyID进行区分
- 每一份数据都有对应的"数据类型".每个数据类型定义了可能的复杂存储、编辑、过期规则,允许实例设置每个值的过期时间.所有被支持的"数据类型"以硬编码的形式存在与代码中,并且对每一个节点是可知的
>注意:OpenDHT与Bittorrent DHT不兼容。
处于最上层的DHT带有供使用者自行选择的公钥加密机制,该机制可以用来对DHT网络的数据进行签名或者加密。被签名的数据只有其所有者拥有编辑权限。程序自动检查从DHT网络获取的被签名的数据,并且只有在签名校验成功的用户才可以查看。
### OpenDHT API接口简介
OpenDHT使用`dht`作为C++命名空间,主要有以下主要的类组成:
- **Infohash** 用来识别DHT网络中的节点和值,是由HASH_LEN个字节组成组成的字节数组。实例化的infohash对象可以用关系运算符`==`进行比较。开发者可以根据给出的字符串或者二进制数据通过调用静态方法`Infohash::get()`计算出hash,例如,调用`Infohash::get("my_key")`获取"my_key"对应的SHA1值。
- **Value** 用来表示存储在DHT网络中的一个值,该值根据给定的InfoHash存储,同样有一个独一无二的ID,用来区分保存在同一个位置的多个值。`dht::Value`是get操作结果数据的数据类型,是put操作的参数数据类型。一个`dht::Value`数据可以由任意二进制数据构造,例如,直接使用构造函数`dht::Value::Value(const std::vector&)`,或者C类型函数`dht::Value::Value(const uint8_t* ptr, size_t len)`。
- **ValueType** 定义了数据在DHT网络中的存储规则,包括保存时间、存储格式以及编辑规则等。每一个被存储的`value`都有与之相对应的`value type`。需要注意的是`value type`通常对数据的序列化没有影响
- **Value::Filter** 是一个继承自`std::function`的类。它可以使开发者规定一个数据是否应该被返回给用户,还定义了一些有用的方法,如:`chain(Value::Filter&&)`和`chainOr(Value::Filter&&)`。
- **Query** 比较类似于`Filter`,`Query`可以用于筛选数据,也可以用于筛选数据中的变量。*query和filter最大的区别在于,query是被远端的节点执行的,开发者在使用库的时候可以对传输有更好的控制*
- **Dht** 该类实现了分布式哈希表的节点,并且提供了基本的操作。它需要一个已有的UDPsocket来发送数据包。单独使用的时候,当接收到一个数据包时,方法`Dht::periodic`需要被循环调用。
- **SecureDht** 是`dht::Dht`的兄弟类,在`dht::Dht`接口基础上增加了检查数据签名、加解密数据,并为发布签名或加密的数据提供额外的方法接口。
- **DhtRunner** 为DHT提供线程安全的接口。DhtRunner是OpenDHT使用最频繁的应用接口类:该类可以被不同的组件或线程安全地并行使用,多用于管理网络传输。
### 回调
Get/listen 操作使用了**GetCallback**和**GetCallbackSimple**作为回调参数
using GetCallback = std::function>& values)>;
using GetCallbackSimple = std::function& value)>;
Query操作使用了类型为**QueryCallback**的回调作为参数,定义如下:
using QueryCallback = std::function>& fields)>;
许多操作同样使用了“操作完成”的回调**DoneCallback**作为参数,定义如下:
using DoneCallback = std::function
### dht::Dht
该类提供了整个组件的核心API。主要的方法有:
#### Constructor
```
Dht::Dht(int s, int s6, const InfoHash& id)
```
构造函数使用了用于发送数据的IPv4,IPv6的udp socket和node ID作为参数。需要至少为正在运行的Dht实例提供一个可用的socket。另一个没有提供可用的socket,需要用`-1`作为参数。
使用了OpenDHT的应用程序都会用DhtRunner类来处理网络传输、提供线程安全的接口。
#### Get
```
void Dht::get(const InfoHash& key, GetCallback cb, DoneCallback donecb={}, Value::Filter f = {}, Query q = {});
```
`Get`根据给定的`key`发起一个搜索操作,从所有可用协议(IPv4/IPv6)中查找对应的values。搜索到结果时会调用第二个参数`cb`返回搜到的结果。该回调会在搜到新的value时被多次调用,直到回调函数返回false。
当操作结束时,作为可选项,回调函数`DoneCallback`将会被调用。
`Filter`可选项,在把结果传递给回调函数之前,对值进行预筛选
`Query`可选的筛选函数,在远端节点筛选值
Dht::get示例:
```
//node is a running instance of dht::Dht
node.get(
dht::InfoHash::get("some_key"),
[](const std::vector>& values) {
for (const auto& v : values)
std::cout << "Got value: " << *v << std::endl;
return true; // keep looking for values
},
[](bool success) {
std::cout << "Get finished with " << (success ? "success" : "failure") << std::endl;
}
);
```
#### Query
```
void Dht::query(const InfoHash& key, QueryCallback cb, DoneCallback done_cb = {}, Query&& q = {});
```
`Query`根据提供的key,在网络上发起一个搜索,最终获取指定范围的值。在搜索的过程中,搜索结果将被提供给第二个参数`cb`。该回调会在搜到新的value时被多次调用,直到回调函数返回false。
当操作完成时,作为可选的回调`DoneCallback`将会被调用。
`Filter`可选的函数,在把结果传递给回调函数之前,对值进行预筛选
`Query`可选的筛选函数,在远端节点筛选值
Dht::query示例:
```
//node is a running instance of dht::Dht
node.query(
dht::InfoHash::get("some_key"),
[](const std::vector>& fields) {
for (const auto& i : fields)
std::cout << "Got index: " << *i << std::endl;
return true; // keep looking for field value index
},
[](bool success) {
std::cout << "Get finished with " << (success ? "success" : "failure") << std::endl;
}
);
```
#### Put
void Dht::put(const InfoHash& key, const std::shared_ptr& value, DoneCallback cb = {});
`Put` 在所有可用网络协议(IPv4/IPv6)中根据提供的key发布一个数据。查看更多关于如何构建`dht::Value`实例的信息可以参考[数据序列化](https://github.com/savoirfairelinux/opendht/wiki/Data-serialization)。当操作完成时,作为可选的回调`DoneCallback`将会被调用(无论成功或失败)。
调用`put`时,如果数据ID为`dht::Value::INVALID_ID(0)`,对数据ID操作的过程中,会对成员变量`Value::id`进行赋值。
一个数据会在网络中保存其整个生命周期(默认10分钟)。调用`put`并且使用同样的key和数据可以刷新有效时间。数据默认无法被编辑(有签名的数据会产生异常)。若网络中存在相同的数据,新数据默认被忽略。
Dht::put示例:
```
const char* my_data = "42 cats";
//node is a running instance of dht::Dht
node.put(
dht::InfoHash::get("some_key"),
dht::Value((const uint8_t*)my_data, std::strlen(my_data))
);
```
#### Listen
```
size_t Dht::listen(const InfoHash& key, GetCallback cb, Value::Filter q = {}, Query q = {});
```
Listen监听某一个hash对应数据的任何改变,新节点加入或移除时,会受到相关的更新通知。首先在网络上根据提供的key发起一个搜索对应数据的操作,并且每当该key对应的数据有改变或者更新时,都可以通过提供的回调函数`cb`接收到通知,直到`cb`返回false或者操作通过调用`bool cancelListen(const InfoHash& key, size_t token)`函数被取消(其中参数`token`是`listen`的返回值)。调用`cancelListen`和回调函数返回false有相同的效果。
Dht::listen示例:
auto key = dht::InfoHash::get("some_key");
auto token = node.listen(key,
[](const std::vector>& values) {
for (const auto& v : values)
std::cout << "Found value: " << *v << std::endl;
return true; // keep listening
}
);
// later
node.cancelListen(key, std::move(token));
监听非序列化的模板类型:
struct Cloud {
uint32_t altitude;
double width, height;
bool rainbow;
MSGPACK_DEFINE_MAP(altitude, width, height, rainbow);
}
std::vector found_clouds;
auto key = dht::InfoHash::get("some_key");
auto token = node.listen(key, [](Cloud&& value) {
// warning: called from another thread
found_clouds.emplace_back(std::move(value));
}
);
// later
node.cancelListen(key, token);
### 过滤(Filters)和枚举(Queries)
#### 过滤(Filters)
过滤器使用函数特例`std::function`来过滤数据。
```
auto coolValueFilter = [](const dht::Value& v) {
return v.user_type == "cool" and v.data.size() < 64;
};
node.get(
dht::InfoHash::get("coolKey"),
[](const std::shared_ptr& value) {
std::cout << "That's a cool value: " << *v << std::endl;
return true; // keep looking for values
},
[](bool success) {
std::cout << "Op went " << (success ? "cool" : "not cool") << std::endl;
},
coolValueFilter);
```
如上所示,`Value::Filter`类非常灵活。不过,这个过滤器只有在本地节点收到回应数据时才会被执行。如果已知所感兴趣的存储存储质已经在集成大量的数据并且你不想造成网络拥堵,此时可以使用`queries`!
#### 枚举(Queries)
如下示例是用queries实现上面示例同样的功能:
```
Where w;
w.id(5); /* the same as Where w("WHERE id=5"); */
node.get(
dht::InfoHash::get("some_key"),
[](const std::vector>& values) {
for (const auto& v : values)
std::cout << "This value has passed through the remotes filters " << *v << std::endl;
return true; // keep looking for values
},
[](bool success) {
std::cout << "Get finished with " << (success ? "success" : "failure") << std::endl;
}, {}, w
);
```
所有可用变量如下:
|Field|
|---|
|Id|
|ValueType|
|OwnerPk|
|UserType|
一个枚举可以通过另外一个枚举知道自己条件是否已达到,例如:
```
Query q1;
q1.where.id(5); // the whole value with id=5 will be sent
Query q2 {{"SELECT value_type"}};
// q2 the same as Query q("SELECT * WHERE value_type=10,user_type=foo_type");
q2.where.valueType(10).userType("foo_type");
Query q3("SELECT id WHERE id=5"); // only the id=5 will be sent
q1.isSatisfiedBy(q3); // false
q2.isSatisfiedBy(q1); // false
q3.isSatisfiedBy(q1); // true
q2.isSatisfiedBy(q3); // false
```
### dht::SecureDht
该类是dht::Dht的扩展,提供了相同的API接口(get,put,listen)。它在DHT中添加了一层公钥加密,一个由用户提供或者自动生成的RSA密钥对将被应用于签名和解密。
通过`::get`和`::listen`返回给用户的数据会在返回之前进行校验和过滤:如果证书校验失败则丢弃该数据。类似的,我们无法解密的那些加密数据也同样会被丢弃或者把这些加密数据再提供给其它用户。
用户可以通过比对变量`recipient`(也就是我们的公钥ID)的值来判断数据是否被加密。
作为`Dht`最上面的一层,`SecureDht`同样可以被用于操作数据。对于未加密和签名的数据,其`get`和`put`方法跟`Dht`的方法一样。
除此之外,SecureDht还增加了一些新的方法:
#### PutSigned
```
void putSigned(const InfoHash& hash, const std::shared_ptr& val, DoneCallback callback);
```
#### PutEncrypted
```
void putEncrypted(const InfoHash& hash, const InfoHash& to, std::shared_ptr val, DoneCallback callback);
```
### dht::DhtRunner
一个DHT节点可以直接把`dht::Dht`单独作为应用到C++中、集成在程序主循环体中,也可以用`dht::DhtRunner`(推荐),`dht::DhtRunner`是对`dht::Dht`类的封装。
#### 使用`dht::DhtRunner`运行一个节点
`dht::DhtRunner`类提供了对一个运行中的DHT实例线程安全的访问,同时也管理着sockets网络。
实际上,`DhtRunner`运行了一个`dht::SecureDht`实例,以方便进行加密操作,当用户需要的时候,一个RSA秘钥对可以被用来签名/加密数据(见`DhtRunner::run`)。
```
dht::DhtRunner node;
// Launch a dht node on a new thread, using a
// generated RSA key pair, and listen on port 4222.
node.run(4222, dht::crypto::generateIdentity(), true);
// use the node...
// stop the node
node.join();
// node.run() can be called again
```
方法`run`的定义如下:
```
void run(in_port_t port, const crypto::Identity identity, bool threaded = false, StatusCallback status_cb = nullptr);
```
- `port` 是用于绑定的UDP端口号
- `identity` 用于加密的RSA密钥对,新的秘钥对可由函数`dht::crypto::generateIdentity()`生成
- `threaded` 定义了是否需要一个新的线程来运行DHT。如果是`true`就不需要进一步的操作来处理DHT。如果是`false`,`DhtRunner::loop()`必须被经常调用
- `status_cb` 是一个状态回调,用来通知IPv4和IPv6的DHT连接状态(正在连接、已连接...)
在一个实例已经在工作的时候,调用`run`不会有任何效果。
#### 加入现有的网络
一个节点可以通过任何其它已连接的节点加入现有的OpenDHT网络。加入网络的方法可以使用以下之一:
```
void bootstrap(const char* host, const char* service);
void bootstrap(const std::vector>& nodes);
void bootstrap(const std::vector& nodes);
```
前两个会ping指定的IP地址。第一个使用一个字符串表示的真实IP地址,第二个使用`sockaddr`结构体。
第三个方法用来加入已知的网络。它会把已知的节点加入到路由表中并且仅在需要的时候才会联系它们。该方法使用了一个类型为`Dht::NodeExport`的容器,由之前执行的`exportNodes()`获取。`Dht::NodeExport`是一个公共结构体,定义如下:
```
struct NodeExport {
InfoHash id;
sockaddr_storage ss;
socklen_t sslen;
};
```
从OpenDHT1.3.7开始,`Dht::NodeExport`就是消息数据可序列化的,以便`exportNodes()`的结果轻松地被序列化/反序列化,如:
```
dht::DhtRunner node;
// Export nodes to binary file
std::ofstream myfile("dhtNodeExport.bin", std::ios::binary);
msgpack::pack(myfile, node.exportNodes());
// Import nodes from binary file
msgpack::unpacker pac;
{
// Read whole file
std::ifstream myfile("dhtNodeExport.bin", std::ios::binary|std::ios::ate);
auto size = myfile.tellg();
myfile.seekg (0, std::ios::beg);
pac.reserve_buffer(size);
myfile.read (pac.buffer(), size);
pac.buffer_consumed(size);
}
// Import nodes
msgpack::object_handle oh;
while (pac.next(oh)) {
auto imported_nodes = oh.get().as>();
std::cout << "Importing : " << imported_nodes.size() << " nodes" << std::endl;
node.bootstrap(imported_nodes);
}
```
### OpenDHT非对外接口
除了上述对外的重要类和方法之外,这里介绍内部使用的一些类和方法,这些内部类和方法体现了OpenDHT运行逻辑,包括数据包的组合、网络的收发、接收数据的处理、节点之间的通信等。
#### 消息循环 DhtRunner::startNetwork
```
void DhtRunner::startNetwork(const SockAddr sin4, const SockAddr sin6);
```
该函数是唯一的接收网络数据的位置,根据传递进来的IPv4和IPv6信息产生对应的socket描述符,之后创建线程,采用select模式循环接收IPv4或IPv6数据。接收到数据后,连同发送方的信息一起保存到vector`buf`中,再发出信号通知工作线程处理数据。接收数据代码如下:
```
try {
while (running_network) {
struct timeval tv {/*.tv_sec = */0, /*.tv_usec = */250000};
fd_set readfds;
FD_ZERO(&readfds);
if(s4 >= 0)
FD_SET(s4, &readfds);
if(s6 >= 0)
FD_SET(s6, &readfds);
int rc = select(s4 > s6 ? s4 + 1 : s6 + 1, &readfds, nullptr, nullptr, &tv);
if(rc < 0) {
if(errno != EINTR) {
perror("select");
std::this_thread::sleep_for( std::chrono::seconds(1) );
}
}
if (not running_network)
break;
if(rc > 0) {
std::array buf;
sockaddr_storage from;
socklen_t from_len = sizeof(from);
if (s4 >= 0 && FD_ISSET(s4, &readfds))
{
rc = recvfrom(s4, (char*)buf.data(), buf.size(), 0, (sockaddr*)&from, &from_len);
}
else if(s6 >= 0 && FD_ISSET(s6, &readfds))
rc = recvfrom(s6, (char*)buf.data(), buf.size(), 0, (sockaddr*)&from, &from_len);
else
break;
if (rc > 0) {
{
std::lock_guard lck(sock_mtx);
rcv.emplace_back(Blob {buf.begin(), buf.begin()+rc+1}, SockAddr(from, from_len));
}
cv.notify_all();
}
}
}
} catch (const std::exception& e) {
std::cerr << "Error in DHT networking thread: " << e.what() << std::endl;
}
```
#### 网络通信消息类型 dht::MessageType
dht所有的网络数据可以分为以下几类,函数根据不同的消息类型做出相应的处理:
```
enum class MessageType {
Error = 0,
Reply,
Ping,
FindNode,
GetValues,
AnnounceValue,
Refresh,
Listen,
ValueData,
ValueUpdate
};
```
#### 网络通信核心 dht::NetworkEngine
`dht::NetworkEngine`是网络通信协议的抽象化。该类提供发送和处理接收数据的公共接口,利用这些接口可以处理所有关于节点的请求,一共有如下几类:
- **请求类接口**:主动向外发出数据包,向对方请求获取某些信息
- **常量数据**:包含程序默认的一些指标,如过期时间、包长度、请求频率等
- **数据处理**:对收到的数据做对应的分析处理
- **应答接口**:针对收到的请求做出的回应
- **回调函数**
##### NetworkEngine::send
该函数是dht的基础发送函数,声明如下:
```
int NetworkEngine::send(const char *buf, size_t len, int flags, const SockAddr& addr);
```
封装了UDP的`sendto`,支持IPv4和IPv6
##### NetworkEngine::sendPing
向一个给定的节点发送ping,如果该节点正常就会回复一个pong消息,视作一次握手
##### NetworkEngine::sendPong
用于回复发送方ping请求
##### NetworkEngine::processMessage
消息处理函数,原型如下:
```
void NetworkEngine::processMessage(const uint8_t *buf, size_t buflen, const SockAddr& from);
```
该函数首先把`buf`指针指向的二进制数据通过函数`msgpack::unpack`进行解包,得到`ParsedMessage`类型的明文数据(网络数据包的封装使用的是[msgpack-c库](https://github.com/msgpack/msgpack-c))。之后把明文数据交由`NetworkEngine::process`进行处理,`NetworkEngine::process`根据数据消息类型做出具体的操作。
#### 节点数据的存储和过期检测
##### 数据存储`Dht::storageStore`
```
bool Dht::storageStore(const InfoHash& id, const Sp& value, time_point created, const SockAddr& sa);
```
把提供的id和value保存在节点。
##### 过期检测`Dht::expireStorage`
```
void Dht::expireStorage(InfoHash h);
```
判断h相关联的数据是否已经失效,如果失效则清除数据。
### 运行过程摘要
1. 执行`put`时,先把任务放进`scheduler`任务队列中,在`scheduler.run`方法中被执行,推送数据时,先发送`request`请求,接收到对方`reply`回应时再执行`search`请求,搜索是否已经有该数据,如果有的话则进行`refresh`操作,如果没有则把数据推出去。
2. 任务并不是立即执行的,而是放到scheduler和pending_ops、rcv中,作为工作队列依次执行。
[https://www.youtube.com/watch?v=Qp19EXmEGgM](https://www.youtube.com/watch?v=Qp19EXmEGgM)
> 参考:https://github.com/savoirfairelinux/opendht/wiki
0 Comments latest
No comments.