Error recovery for network & member stream watchers

This commit is contained in:
Grant Limberg 2020-05-20 11:42:51 -07:00
parent 0f17508cac
commit 8b8399efbc
No known key found for this signature in database
GPG Key ID: 2BA62CCABBB4095A

View File

@ -735,50 +735,54 @@ void PostgreSQL::_membersWatcher_Redis() {
std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}"; std::string key = "member-stream:{" + std::string(_myAddress.toString(buf)) + "}";
while (_run == 1) { while (_run == 1) {
json tmp; try {
std::unordered_map<std::string, ItemStream> result; json tmp;
if (_rc->clusterMode) { std::unordered_map<std::string, ItemStream> result;
_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); if (_rc->clusterMode) {
} else { _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); } else {
} _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
if (!result.empty()) { }
for (auto element : result) { if (!result.empty()) {
#ifdef ZT_TRACE for (auto element : result) {
fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); #ifdef ZT_TRACE
#endif fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
for (auto rec : element.second) { #endif
std::string id = rec.first; for (auto rec : element.second) {
auto attrs = rec.second; std::string id = rec.first;
#ifdef ZT_TRACE auto attrs = rec.second;
fprintf(stdout, "Record ID: %s\n", id.c_str()); #ifdef ZT_TRACE
fprintf(stdout, "attrs len: %lu\n", attrs.size()); fprintf(stdout, "Record ID: %s\n", id.c_str());
#endif fprintf(stdout, "attrs len: %lu\n", attrs.size());
for (auto a : attrs) { #endif
#ifdef ZT_TRACE for (auto a : attrs) {
fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); #ifdef ZT_TRACE
#endif fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
try { #endif
tmp = json::parse(a.second); try {
json &ov = tmp["old_val"]; tmp = json::parse(a.second);
json &nv = tmp["new_val"]; json &ov = tmp["old_val"];
json oldConfig, newConfig; json &nv = tmp["new_val"];
if (ov.is_object()) oldConfig = ov; json oldConfig, newConfig;
if (nv.is_object()) newConfig = nv; if (ov.is_object()) oldConfig = ov;
if (oldConfig.is_object()||newConfig.is_object()) { if (nv.is_object()) newConfig = nv;
_memberChanged(oldConfig,newConfig,(this->_ready >= 2)); if (oldConfig.is_object()||newConfig.is_object()) {
_memberChanged(oldConfig,newConfig,(this->_ready >= 2));
}
} catch (...) {
fprintf(stderr, "json parse error in networkWatcher_Redis\n");
} }
} catch (...) {
fprintf(stderr, "json parse error in networkWatcher_Redis\n");
} }
} if (_rc->clusterMode) {
if (_rc->clusterMode) { _cluster->xdel(key, id);
_cluster->xdel(key, id); } else {
} else { _redis->xdel(key, id);
_redis->xdel(key, id); }
} }
} }
} }
} catch (sw::redis::Error &e) {
fprintf(stderr, "Error in Redis members watcher: %s\n", e.what());
} }
} }
fprintf(stderr, "membersWatcher ended\n"); fprintf(stderr, "membersWatcher ended\n");
@ -856,51 +860,55 @@ void PostgreSQL::_networksWatcher_Redis() {
std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}"; std::string key = "network-stream:{" + std::string(_myAddress.toString(buf)) + "}";
while (_run == 1) { while (_run == 1) {
json tmp; try {
std::unordered_map<std::string, ItemStream> result; json tmp;
if (_rc->clusterMode) { std::unordered_map<std::string, ItemStream> result;
_cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); if (_rc->clusterMode) {
} else { _cluster->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
_redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end())); } else {
} _redis->xread(key, "$", std::chrono::seconds(1), 0, std::inserter(result, result.end()));
}
if (!result.empty()) {
for (auto element : result) { if (!result.empty()) {
for (auto element : result) {
#ifdef ZT_TRACE #ifdef ZT_TRACE
fprintf(stdout, "Received notification from: %s\n", element.first.c_str()); fprintf(stdout, "Received notification from: %s\n", element.first.c_str());
#endif #endif
for (auto rec : element.second) { for (auto rec : element.second) {
std::string id = rec.first; std::string id = rec.first;
auto attrs = rec.second; auto attrs = rec.second;
#ifdef ZT_TRACE #ifdef ZT_TRACE
fprintf(stdout, "Record ID: %s\n", id.c_str()); fprintf(stdout, "Record ID: %s\n", id.c_str());
fprintf(stdout, "attrs len: %lu\n", attrs.size()); fprintf(stdout, "attrs len: %lu\n", attrs.size());
#endif #endif
for (auto a : attrs) { for (auto a : attrs) {
#ifdef ZT_TRACE #ifdef ZT_TRACE
fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str()); fprintf(stdout, "key: %s\nvalue: %s\n", a.first.c_str(), a.second.c_str());
#endif #endif
try { try {
tmp = json::parse(a.second); tmp = json::parse(a.second);
json &ov = tmp["old_val"]; json &ov = tmp["old_val"];
json &nv = tmp["new_val"]; json &nv = tmp["new_val"];
json oldConfig, newConfig; json oldConfig, newConfig;
if (ov.is_object()) oldConfig = ov; if (ov.is_object()) oldConfig = ov;
if (nv.is_object()) newConfig = nv; if (nv.is_object()) newConfig = nv;
if (oldConfig.is_object()||newConfig.is_object()) { if (oldConfig.is_object()||newConfig.is_object()) {
_networkChanged(oldConfig,newConfig,(this->_ready >= 2)); _networkChanged(oldConfig,newConfig,(this->_ready >= 2));
}
} catch (...) {
fprintf(stderr, "json parse error in networkWatcher_Redis\n");
} }
} catch (...) {
fprintf(stderr, "json parse error in networkWatcher_Redis\n");
} }
} if (_rc->clusterMode) {
if (_rc->clusterMode) { _cluster->xdel(key, id);
_cluster->xdel(key, id); } else {
} else { _redis->xdel(key, id);
_redis->xdel(key, id); }
} }
} }
} }
} catch (sw::redis::Error &e) {
fprintf(stderr, "Error in Redis networks watcher: %s\n", e.what());
} }
} }
fprintf(stderr, "networksWatcher ended\n"); fprintf(stderr, "networksWatcher ended\n");