Skip to content

Commit

Permalink
[Feature] Support restoring from a cluster snapshot for shared-data (…
Browse files Browse the repository at this point in the history
…part 1)

Signed-off-by: xiangguangyxg <[email protected]>
  • Loading branch information
xiangguangyxg committed Dec 12, 2024
1 parent 33ac748 commit c7b9e88
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 8 deletions.
7 changes: 5 additions & 2 deletions bin/start_fe.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ OPTS=$(getopt \
-l 'daemon' \
-l 'helper:' \
-l 'host_type:' \
-l 'restore_snapshot' \
-l 'debug' \
-l 'logconsole' \
-- "$@")
Expand All @@ -34,6 +35,7 @@ eval set -- "$OPTS"
RUN_DAEMON=0
HELPER=
HOST_TYPE=
RESTORE_SNAPSHOT=
ENABLE_DEBUGGER=0
RUN_LOG_CONSOLE=${SYS_LOG_TO_CONSOLE:-0}
# min jdk version required
Expand All @@ -43,6 +45,7 @@ while true; do
--daemon) RUN_DAEMON=1 ; shift ;;
--helper) HELPER=$2 ; shift 2 ;;
--host_type) HOST_TYPE=$2 ; shift 2 ;;
--restore_snapshot) RESTORE_SNAPSHOT="-restore_snapshot" ; shift ;;
--debug) ENABLE_DEBUGGER=1 ; shift ;;
--logconsole) RUN_LOG_CONSOLE=1 ; shift ;;
--) shift ; break ;;
Expand Down Expand Up @@ -228,7 +231,7 @@ echo "start time: $(date), server uptime: $(uptime)"

# StarRocksFE java process will write its process id into $pidfile
if [ ${RUN_DAEMON} -eq 1 ]; then
nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" </dev/null &
nohup $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} ${RESTORE_SNAPSHOT} "$@" </dev/null &
else
exec $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} "$@" </dev/null
exec $LIMIT $JAVA $final_java_opt com.starrocks.StarRocksFE ${HELPER} ${HOST_TYPE} ${RESTORE_SNAPSHOT} "$@" </dev/null
fi
1 change: 1 addition & 0 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ if [ ${BUILD_FE} -eq 1 -o ${BUILD_SPARK_DPP} -eq 1 ]; then
cp -r -p ${STARROCKS_HOME}/conf/udf_security.policy ${STARROCKS_OUTPUT}/fe/conf/
cp -r -p ${STARROCKS_HOME}/conf/hadoop_env.sh ${STARROCKS_OUTPUT}/fe/conf/
cp -r -p ${STARROCKS_HOME}/conf/core-site.xml ${STARROCKS_OUTPUT}/fe/conf/
cp -r -p ${STARROCKS_HOME}/conf/restore_snapshot.yaml ${STARROCKS_OUTPUT}/fe/conf/

rm -rf ${STARROCKS_OUTPUT}/fe/lib/*
cp -r -p ${STARROCKS_HOME}/fe/fe-core/target/lib/* ${STARROCKS_OUTPUT}/fe/lib/
Expand Down
39 changes: 39 additions & 0 deletions conf/restore_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# do not include leader fe
#frontends:
# - host: 172.26.92.1
# edit_log_port: 9010
# type: follower #default follower
# - host: 172.26.92.2
# edit_log_port: 9010
# type: observer

#compute_nodes:
# - host: 172.26.92.11
# heartbeat_service_port: 9050
# - host: 172.26.92.12
# heartbeat_service_port: 9050

# used for restoring a cloned snapshot
#storage_volumes:
# - name: my_s3_volume
# type: S3
# location: s3://defaultbucket/test/
# comment: my s3 volume
# properties:
# - key: aws.s3.region
# value: us-west-2
# - key: aws.s3.endpoint
# value: https://s3.us-west-2.amazonaws.com
# - key: aws.s3.access_key
# value: xxxxxxxxxx
# - key: aws.s3.secret_key
# value: yyyyyyyyyy
# - name: my_hdfs_volume
# type: HDFS
# location: hdfs://127.0.0.1:9000/sr/test/
# comment: my hdfs volume
# properties:
# - key: hadoop.security.authentication
# value: simple
# - key: username
# value: starrocks
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/StarRocksFE.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.starrocks.qe.CoordinatorMonitor;
import com.starrocks.qe.QeService;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RestoreSnapshotMgr;
import com.starrocks.server.RunMode;
import com.starrocks.service.ExecuteEnv;
import com.starrocks.service.FrontendOptions;
Expand Down Expand Up @@ -119,6 +120,8 @@ public static void start(String starRocksDir, String pidDir, String[] args) {
// set dns cache ttl
java.security.Security.setProperty("networkaddress.cache.ttl", "60");

RestoreSnapshotMgr.init(starRocksDir + "/conf/restore_snapshot.yaml", args);

// check meta dir
MetaHelper.checkMetaDir();

Expand Down Expand Up @@ -181,6 +184,8 @@ public static void start(String starRocksDir, String pidDir, String[] args) {

LOG.info("FE started successfully");

RestoreSnapshotMgr.finishRestoring();

while (!stopped) {
Thread.sleep(2000);
}
Expand Down Expand Up @@ -221,6 +226,7 @@ private static CommandLineOptions parseArgs(String[] args) {
CommandLineParser commandLineParser = new BasicParser();
Options options = new Options();
options.addOption("ht", "host_type", false, "Specify fe start use ip or fqdn");
options.addOption("rs", "restore_snapshot", false, "Specify fe start to restore from a snapshot");
options.addOption("v", "version", false, "Print the version of StarRocks Frontend");
options.addOption("h", "helper", true, "Specify the helper node when joining a bdb je replication group");
options.addOption("b", "bdb", false, "Run bdbje debug tools");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.starrocks.common;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RestoreSnapshotConfig {
private static final Logger LOG = LogManager.getLogger(RestoreSnapshotConfig.class);

public static class Frontend {
public static enum FrontendType {
FOLLOWER,
OBSERVER;

@JsonCreator
public static FrontendType forValue(String value) {
return FrontendType.valueOf(value.toUpperCase());
}

@JsonValue
public String toValue() {
return name().toLowerCase();
}
}

@JsonProperty("host")
private String host;

@JsonProperty("edit_log_port")
private int editLogPort;

@JsonProperty("type")
private FrontendType type = FrontendType.FOLLOWER;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getEditLogPort() {
return editLogPort;
}

public void setEditLogPort(int editLogPort) {
this.editLogPort = editLogPort;
}

public FrontendType getType() {
return type;
}

public void setType(FrontendType type) {
this.type = type;
}

public boolean isFollower() {
return this.type == FrontendType.FOLLOWER;
}

public boolean isObserver() {
return this.type == FrontendType.OBSERVER;
}

@Override
public String toString() {
return "Frontend [host=" + host + ", editLogPort=" + editLogPort + ", type=" + type + "]";
}
}

public static class ComputeNode {
@JsonProperty("host")
private String host;

@JsonProperty("heartbeat_service_port")
private int heartbeatServicePort;

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public int getHeartbeatServicePort() {
return heartbeatServicePort;
}

public void setHeartbeatServicePort(int heartbeatServicePort) {
this.heartbeatServicePort = heartbeatServicePort;
}

@Override
public String toString() {
return "ComputeNode [host=" + host + ", heartbeatServicePort=" + heartbeatServicePort + "]";
}
}

public static class StorageVolume {
public static enum StorageVolumeType {
S3,
HDFS,
AZBLOB;

@JsonCreator
public static StorageVolumeType forValue(String value) {
return StorageVolumeType.valueOf(value.toUpperCase());
}

@JsonValue
public String toValue() {
return name().toLowerCase();
}
}

private static class PropertiesDeserializer extends JsonDeserializer<Map<String, String>> {

@Override
public Map<String, String> deserialize(JsonParser parser, DeserializationContext context)
throws IOException, JsonProcessingException {
ObjectMapper mapper = (ObjectMapper) parser.getCodec();
List<Map<String, String>> list = mapper.readValue(parser,
new TypeReference<List<Map<String, String>>>() {
});

Map<String, String> properties = new HashMap<>();
for (Map<String, String> entry : list) {
String key = entry.get("key");
String value = entry.get("value");
if (key == null || key.isEmpty() || value == null || value.isEmpty()) {
throw new JsonProcessingException("Missing 'key' or 'value' in properties entry",
parser.getTokenLocation()) {
};
}
properties.put(key, value);
}
return properties;
}
}

@JsonProperty("name")
private String name;

@JsonProperty("type")
private StorageVolumeType type;

@JsonProperty("location")
private String location;

@JsonProperty("comment")
private String comment;

@JsonProperty("properties")
@JsonDeserialize(using = PropertiesDeserializer.class)
private Map<String, String> properties;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public StorageVolumeType getType() {
return type;
}

public void setType(StorageVolumeType type) {
this.type = type;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

public String getComment() {
return comment;
}

public void setComment(String comment) {
this.comment = comment;
}

public Map<String, String> getProperties() {
return properties;
}

public void setProperties(Map<String, String> properties) {
this.properties = properties;
}
}

@JsonProperty("frontends")
private List<Frontend> frontends;

@JsonProperty("compute_nodes")
private List<ComputeNode> computeNodes;

@JsonProperty("storage_volumes")
private List<StorageVolume> storageVolumes;

public List<Frontend> getFrontends() {
return frontends;
}

public List<ComputeNode> getComputeNodes() {
return computeNodes;
}

public List<StorageVolume> getStorageVolumes() {
return storageVolumes;
}

public static RestoreSnapshotConfig load(String restoreSnapshotYamlFile) {
try {
ObjectMapper objectMapper = new ObjectMapper(new YAMLFactory());
RestoreSnapshotConfig config = objectMapper.readValue(new File(restoreSnapshotYamlFile),
RestoreSnapshotConfig.class);
return config;
} catch (Exception e) {
LOG.warn("Failed to load restore snapshot config {} ", restoreSnapshotYamlFile, e);
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit c7b9e88

Please sign in to comment.