Skip to content

Commit

Permalink
[Enhancement] Add a check about accessible of BE ports before remove …
Browse files Browse the repository at this point in the history
…the blacklist(#31750) (#31893)

Currently, the backend will be added to the blacklist when encountering a RPC error. But removing it from the blacklist only takes the Heartbeat port into consideration. This pull request adds a check for all ports(BE port, BE BRPC port, BE HTTP port). If one of the ports is not accessible, it will not be removed from the blacklist.
  • Loading branch information
chaoyli authored Sep 27, 2023
1 parent f51d653 commit a3b58a0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 9 deletions.
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/util/NetUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.Socket;
import java.net.SocketException;
Expand Down Expand Up @@ -84,4 +85,18 @@ public static Pair<String, String> getIpAndFqdnByHost(String host) throws Unknow
}
return new Pair<>(ip, fqdn);
}

public static boolean checkAccessibleForAllPorts(String host, List<Integer> ports) {
boolean accessible = true;
int timeout = 1000; // Timeout in milliseconds
for (Integer port : ports) {
try (Socket socket = new Socket()) {
socket.connect(new InetSocketAddress(host, port), timeout);
} catch (IOException e) {
accessible = false;
break;
}
}
return accessible;
}
}
41 changes: 32 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/qe/SimpleScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Maps;
import com.starrocks.common.Config;
import com.starrocks.common.Reference;
import com.starrocks.common.util.NetUtils;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
Expand All @@ -35,6 +36,8 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -244,6 +247,19 @@ public static boolean isInBlacklist(long backendId) {
}
}

// The function is used for unit test
public static boolean removeFromBlacklist(Long backendID) {
if (backendID == null) {
return true;
}
lock.lock();
try {
return blacklistBackends.remove(backendID) != null;
} finally {
lock.unlock();
}
}

private static class UpdateBlacklistThread implements Runnable {
private static final Logger LOG = LogManager.getLogger(UpdateBlacklistThread.class);
private static Thread thread;
Expand Down Expand Up @@ -272,23 +288,30 @@ public void run() {
Map.Entry<Long, Integer> entry = iterator.next();
Long backendId = entry.getKey();

// remove from blacklist if
// 1. backend does not exist anymore
// 2. backend is alive
if (clusterInfoService.getBackend(backendId) == null
|| clusterInfoService.checkBackendAvailable(backendId)) {
// 1. If the backend is null, means that the backend has been removed.
// 2. check the all ports of the backend
// 3. retry three times
// If both of the above conditions are met, the backend is removed from the blacklist
Backend backend = clusterInfoService.getBackend(backendId);
if (backend == null) {
iterator.remove();
LOG.debug("remove backendID {} which is alive", backendId);
LOG.warn("remove backendID {} from blacklist", backendId);
} else if (clusterInfoService.checkBackendAvailable(backendId)) {
String host = backend.getHost();
List<Integer> ports = new ArrayList<Integer>();
Collections.addAll(ports, backend.getBePort(), backend.getBrpcPort(), backend.getHttpPort());
if (NetUtils.checkAccessibleForAllPorts(host, ports)) {
iterator.remove();
LOG.warn("remove backendID {} from blacklist", backendId);;
}
} else {
// 3. max try time is reach
Integer retryTimes = entry.getValue();
retryTimes = retryTimes - 1;
if (retryTimes <= 0) {
iterator.remove();
LOG.warn("remove backendID {}. reach max try time", backendId);
LOG.warn("remove backendID {} from blacklist", backendId);
} else {
entry.setValue(retryTimes);
LOG.debug("blacklistBackends backendID={} retryTimes={}", backendId, retryTimes);
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/qe/SimpleSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Maps;
import com.starrocks.common.Config;
import com.starrocks.common.Reference;
import com.starrocks.common.util.NetUtils;
import com.starrocks.persist.EditLog;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
Expand All @@ -36,6 +37,7 @@
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -201,4 +203,30 @@ public void testBlackList() {
address = SimpleScheduler.getBackendHost(immutableThreeBackends, ref);
Assert.assertNull(address);
}

// @Test
public void testRemoveBackendFromBlackList() {
Config.heartbeat_timeout_second = Integer.MAX_VALUE;
TNetworkAddress address = null;

Backend backendA = new Backend(100, "addressA", 0);
backendA.updateOnce(0, 0, 0);
Map<Long, Backend> backends = Maps.newHashMap();
backends.put((long) 100, backendA);
ImmutableMap<Long, Backend> immutableBackends = ImmutableMap.copyOf(backends);

SimpleScheduler.addToBlacklist(Long.valueOf(100));
address = SimpleScheduler.getBackendHost(immutableBackends, ref);
Assert.assertNull(address);

String host = backendA.getHost();
List<Integer> ports = new ArrayList<Integer>();
Collections.addAll(ports, backendA.getBePort(), backendA.getBrpcPort(), backendA.getHttpPort());
boolean accessible = NetUtils.checkAccessibleForAllPorts(host, ports);
Assert.assertFalse(accessible);

SimpleScheduler.removeFromBlacklist(Long.valueOf(100));
address = SimpleScheduler.getBackendHost(immutableBackends, ref);
Assert.assertEquals(address.hostname, "addressA");
}
}

0 comments on commit a3b58a0

Please sign in to comment.