Comments (23)
我刚刚理解错你的意思了,sorry,我重新回复你一下:phxpaxos的batch propose是多个值绑定在同一个instance里的,其实就是多个client request是成组处理的,多个状态机也是成组执行的,具体请参见SMFac :: BatchExecute(),如果状态机不全部执行完成是不会更新InstanceID的。
我还有个疑问,你为什么要特意提到ExecuteForCheckpoint函数,这个函数是用户自定义的,说Execute函数更合理吧。
from phxpaxos.
@niukuo 补充一下:我看到了你提到了5条log,paxos log与instance是一一对应的,所以只有一条log。
from phxpaxos.
可能是表述的有问题 里面的“5条log”是想表达5个值的意思
考虑如下场景:
InstanceID=2中包含5个值,对应同一个状态机(由用户实现)。当前三个值应用(Execute)到状态机后,进程挂掉了。
用户应该何时更新状态机保存的CheckpointInstanceID(当phxpaxos调用GetCheckpointInstanceID时返回给phxpaxos)才能保证状态机的一致性?
from phxpaxos.
上面的场景是说用户应该如何保存Checkpoint
想到一种方法:每当Execute(或ExecuteForCheckpoint)发现InstanceID变了,将上一个InstanceID对应的一个或多个值应用到Checkpoint
但是这样对用户来说是否太复杂了?
from phxpaxos.
@niukuo 你说的没错,checkpoint是与上层应用挂钩的,所以phxpaxos很多地方只是提供抽象接口了事。你说的batch propose的ExecuteForCheckpoint其实就是phxpaxos里的SMFac :: BatchExecuteForCheckpoint函数,可以发现,这个函数是执行完所有的客户端请求的状态机之后才返回的,所以不存在你说的多个值执行到一半的情况,肯定是原子的动作。
from phxpaxos.
我再具体一点:
InstanceID=2中包含5个值
所以一定会调用5次用户状态机实现的
bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID, const std::string & sPaxosValue);
分别是
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第1个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第2个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第3个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第4个值)
sm->ExecuteForCheckpoint(groupId, InstanceID = 2, 第5个值)
(此处若有问题请指出)
正常情况下是没有问题的。
问题是:
用户实现的ExecuteForCheckpoint中,如果在收到第1个值时就更新InstanceID(此时无法知道后面是否还会有相同InstanceID的值)。如果此时用户把InstanceID持久化后,进程挂掉,下次进程启动并恢复状态机时,第2个~第5个值没有调用ExecuteForCheckpoint。
而是直接ExecuteForCheckpoint(groupId, InstanceID = 3或更大, 对应的值)
一种场景是进程挂掉这种极端情况。
不考虑极端情况的另一种场景是Execute或ExecuteForCheckpoint执行到中间某个值时(例如第3个)返回了false,后面是不是会把已经应用到状态机的第1个值和第2个值再Execute一遍
from phxpaxos.
@niukuo 我觉得你还是没理解我的意思,batch propose只会调用SMFac :: BatchExecuteForCheckpoint,然后这个函数会一次性调用每个值的SMFac的ExecuteForCheckpoint,不会是每个值单独调用ExecuteForCheckpoint,只有所有ExecuteForCheckpoint执行完毕之后才会更新InstanceID。
具体代码:
bool SMFac :: BatchExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sBodyValue)
{
BatchPaxosValues oBatchValues;
bool bSucc = oBatchValues.ParseFromArray(sBodyValue.data(), sBodyValue.size());
if (!bSucc)
{
PLG1Err("ParseFromArray fail, valuesize %zu", sBodyValue.size());
return false;
}
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid());
if (!bExecuteSucc)
{
return false;
}
}
return true;
}
因为上层应用耦合问题,phxpaxos只是帮用户实现了这个接口,怎么调用还是由用户自己决定,所以在使用phxpaxos库时还是需要对整个流程有个整体的把握才行。
from phxpaxos.
CheckpointInstanceID不是用户通过实现GetCheckpointInstanceID返回给phxpaxos的吗
在进程重启时会不会遇到我上面说的"InstanceID=2的5个值执行了1个时进程挂掉下次重启时会直接从InstanceID=3开始恢复"还是说phxpaxos会保存当前每个状态机对应的checkpoint版本?
还有代码中
for (int i = 0; i < oBatchValues.values_size(); i++)
{
const PaxosValue & oValue = oBatchValues.values(i);
bool bExecuteSucc = DoExecuteForCheckpoint(iGroupIdx, llInstanceID, oValue.value(), oValue.smid());
if (!bExecuteSucc)
{
return false;
}
}
看了一下DoExecuteForCheckpoint里面并没有保存"当前执行到该Instance中第几个值"这个状态,是不是可能存在我上面说的“执行到第3个值时用户return false会导致1,2两个值被重复Execute”。
from phxpaxos.
sm.h中没有BatchExecute(ForCheckpoint)接口,所以batch propose最终到达用户的状态机时一定是对同一个instance中的5个值执行多次ExecuteForCheckpoint我的理解是否有问题?
from phxpaxos.
@niukuo 你理解的没错,假设5个值执行了一半但是挂了,但是并没有什么关系啊,1、2重复执行就重复执行呗,并没有影响数据一致性,因为只要保证instance执行单元是原子的就行,假如你说这么做有问题,那么就算是普通的propose也会有同样的问题,你自己类比一下吧。
from phxpaxos.
大部分场景下重复执行可能问题不大 可以先略过 但是上面说的使用ExecuteForCheckpoint生成快照时 是不是可能会漏掉batch propose中的后面几个值呢 执行了第一个值后已经更新了InstanceID 后面几个值没来得及执行时进程挂掉后是不是下次启动会直接从下一个InstanceID开始执行了呢
from phxpaxos.
例如进程启动后GetCheckpointInstanceID返回是2 那么下次是会从2开始执行还是3呢
from phxpaxos.
@niukuo 所有的值的状态机执行完毕之前不会更新InstanceID,部分执行的值宕机恢复时会重复去执行。具体看代码去吧,不再回复。
from phxpaxos.
好的多谢讨论
from phxpaxos.
针对单个propose来说,执行状态机和更新instance操作并不是一个事务,也就是说这两者并不是一个原子操作,所以必定导致最后一个instance会重复执行状态机。对于batch也是一样。要解决这个问题,必定是要求paxos和状态机紧密结合,协同设计,而phxpaxos是一个状态机完全开放性的通用库,暂时无法解决这个重复执行问题。但是在大部分场景,状态机自身解决重复执行最后一个instance的问题,还是比较简单的。
from phxpaxos.
以下的代码可以重现这个问题
#include <string>
#include <functional>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <phxpaxos/node.h>
#include <phxpaxos/sm.h>
using namespace std;
static const int gSMID = 123;
static const int gGroupId = 0;
class MockSM : public phxpaxos::StateMachine {
public:
static const int mSMID = gSMID;
uint64_t mCptVersion;
MockSM(uint64_t cptVersion = phxpaxos::NoCheckpoint)
: mCptVersion{cptVersion}
{
}
bool Execute(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue, phxpaxos::SMCtx* poSMCtx) override
{
return true;
}
const int SMID() const override { return mSMID; }
bool ExecuteForCheckpoint(const int iGroupIdx, const uint64_t llInstanceID,
const std::string & sPaxosValue) override
{
bool ret = mCptVersion == phxpaxos::NoCheckpoint || llInstanceID > mCptVersion;
printf("update checkpoint id = %lu, value = %s, ret = %s\n",
llInstanceID, sPaxosValue.c_str(), ret ? "true" : "false");
if (ret) {
mCptVersion = llInstanceID;
}
return ret;
}
const uint64_t GetCheckpointInstanceID(const int iGroupIdx) const override
{ return mCptVersion; }
};
class ManualResetEvent {
private:
std::mutex mMutex;
std::condition_variable mCond;
bool mSet{false};
public:
void Set()
{
std::unique_lock<std::mutex> locker(mMutex);
mSet = true;
mCond.notify_all();
}
void Reset()
{
std::unique_lock<std::mutex> locker(mMutex);
mSet = false;
}
void Wait()
{
std::unique_lock<std::mutex> locker(mMutex);
mCond.wait(locker, [this]() { return mSet; });
}
};
void BatchPropose(phxpaxos::Node *node, ManualResetEvent *e, std::string value)
{
std::mutex m;
std::unique_lock<std::mutex> locker(m);
e->Wait();
uint64_t id = (uint64_t)-1;
uint32_t idx = (uint32_t)-1;
phxpaxos::SMCtx ctx;
ctx.m_iSMID = gSMID;
int ret = node->BatchPropose(gGroupId, value, id, idx, &ctx);
printf("batch propose id = %lu, idx = %u, value = %s, ret = %d\n", id, idx, value.c_str(), ret);
ASSERT_EQ(0, ret);
}
TEST(phxpaxos, BatchTest)
{
std::string ip = "127.0.0.1";
int port = 12121;
MockSM sm;
phxpaxos::Node *node = nullptr;
phxpaxos::Options options;
{
options.sLogStoragePath = "data/log_test";
mkdir(options.sLogStoragePath.c_str(), S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
options.oMyNode = phxpaxos::NodeInfo(ip, port);
options.vecNodeInfoList.emplace_back(options.oMyNode);
options.bUseBatchPropose = true;
options.bUseCheckpointReplayer = true;
phxpaxos::GroupSMInfo smInfo;
smInfo.iGroupIdx = gGroupId;
smInfo.vecSMList.push_back(&sm);
smInfo.bIsUseMaster = false;
options.vecGroupSMInfoList.push_back(smInfo);
}
// start a single paxos node
int ret = phxpaxos::Node::RunNode(options, node);
ASSERT_EQ(0, ret);
// batch propose
{
ManualResetEvent e;
std::vector<std::thread> workers;
workers.emplace_back(&BatchPropose, node, &e, "a");
workers.emplace_back(&BatchPropose, node, &e, "b");
workers.emplace_back(&BatchPropose, node, &e, "c");
workers.emplace_back(&BatchPropose, node, &e, "d");
workers.emplace_back(&BatchPropose, node, &e, "e");
e.Set();
for (auto &worker : workers) {
worker.join();
}
}
node->ContinueCheckpointReplayer();
puts("cpt replayer started");
// propose
{
phxpaxos::SMCtx ctx;
ctx.m_iSMID = gSMID;
uint64_t id = (uint64_t)-1;
std::string value = "single1";
int ret = node->Propose(gGroupId, value, id, &ctx);
printf("propose id = %lu, value = %s, ret = %d\n", id, value.c_str(), ret);
ASSERT_EQ(0, ret);
value = "single2";
ret = node->Propose(gGroupId, value, id, &ctx);
printf("propose id = %lu, value = %s, ret = %d\n", id, value.c_str(), ret);
ASSERT_EQ(0, ret);
}
while (sm.mCptVersion == phxpaxos::NoCheckpoint) {
sleep(1);
}
printf("now cpt version = %lu\n", sm.mCptVersion);
delete node;
node = nullptr;
printf("starting new node\n");
ret = phxpaxos::Node::RunNode(options, node);
ASSERT_EQ(0, ret);
node->ContinueCheckpointReplayer();
puts("cpt replayer started");
while (true) {
sleep(1);
}
}
构造了如下场景:
- 使用ExecuteForCheckpoint辅助生成snapshot
- 使用BatchPropose
- BatchPropose中的多个值对应同一个状态机
- 多个值ExecuteForCheckpoint时,中间某个值执行时返回了false
- 节点或进程重启
执行结果
[==========] Running 1 test from 1 test case.
[----------] Global test environment set-up.
[----------] 1 test from phxpaxos
[ RUN ] phxpaxos.BatchTest
batch propose id = 0, idx = 2, value = b, ret = 0
batch propose id = 0, idx = 4, value = d, ret = 0
batch propose id = 0, idx = 3, value = c, ret = 0
batch propose id = 0, idx = 1, value = a, ret = 0
batch propose id = 0, idx = 0, value = e, ret = 0
cpt replayer started
propose id = 1, value = single1, ret = 0
propose id = 2, value = single2, ret = 0
update checkpoint id = 0, value = e, ret = true
update checkpoint id = 0, value = a, ret = false
now cpt version = 0
update checkpoint id = 0, value = e, ret = false
update checkpoint id = 0, value = e, ret = false
update checkpoint id = 0, value = e, ret = false
starting new node
cpt replayer started
update checkpoint id = 1, value = single1, ret = true
^C
instanceID=0的5个值按顺序分别是 e, a, b, c, d
可以看到:
e在已经被执行的情况下又被执行(这个问题状态机自身可以解决)
a执行时return false后Node重启导致bcd被跳过,状态机没有机会执行。 <---- 这是问题的重点
from phxpaxos.
当你excute返回了false,决不能更新你的checkpoint instanceid,因为下次重启会首先依据你给予的checkpoint instanceid来作为execute的起始点。具体可以参考wiki里面的 checkpoint详解。
from phxpaxos.
这里没有影响的。
由于是同一个instanceid对应的多个值,checkpoint instanceid在这里实际上没有改变。(从0变成0)
第一次return true时instanceid 由 -1变成了0
from phxpaxos.
我指的execute返回false是针对整个instance来说的。很明显你应该在整个instance的所有value都execute后,才能更新checkpointid,而第一次的-1变成0已经是错误了。在batch模式下,如何更新chexkpoint是一个要仔细思考的事情。
from phxpaxos.
状态机在执行ExecuteForCheckpoint时没有机会知道这是batch中的一个值还是一个单独的instance
from phxpaxos.
能知道是最后一个固然更好,但目前为了API能向下兼容,暂时不会更改。对于一个有数据的状态机。这里有非常多的简单的办法来生成checkpoint,比如可以执行i的时候生成i-1的checkpoint,这里就不继续讨论了。
from phxpaxos.
感谢讨论,希望能够更新一下wiki
from phxpaxos.
关于batch使用注意事项,确实要更新一些文档。您有兴趣的话也可以写一下,我们会进行采纳:)
from phxpaxos.
Related Issues (20)
- checkpoint机制是不是与多个group不好同时采用?因为多个group的话,每个group都有自己的镜像数据,新机器加入的话,难道每个group都要从旧机器接收镜像追赶进度,而每个group学完都会退出进程 HOT 2
- 请问batch propose 2KB 15W,压测条件为单条数据2KB再进行一定量的合并吗? HOT 2
- phxpaxos 如何保证latest read HOT 1
- 头文件找不到 HOT 1
- 构建环境可以简化不?
- TLA model?
- 局域网测试sample失败 HOT 1
- 一个log文件(.f文件)里面存放几个instance?
- 用户写请求,提交了上次选主超时的value,导致主的租约为0,出现失主现象
- 提案通过后,为啥leader节点先执行状态机
- 关于编译手册的疑问
- 切片收发问题 HOT 1
- 在云环境中与libfaketime的使用产生冲突问题
- 请问夺主时,BeforePropose起到什么作用?
- 如果发生master切换,如何保证新 master 的数据是最新的? HOT 2
- 当 输入paxos的value过长时会有问题吗? HOT 1
- CheckpointSender 线程回收问题 HOT 1
- checkpoint 接收新的状态机 paxos_log日志,重建index后, 在下次重启前以及checkpoint文件传输前,propose超时定时器超时,写入llstance到paxos_log HOT 2
- Project status
- sample phxkv 依赖问题 HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from phxpaxos.