Skip to content

Commit

Permalink
Fix workload recovery func post netsplit or shutdown
Browse files Browse the repository at this point in the history
Signed-off-by: Mahesh Shetty <[email protected]>
  • Loading branch information
Mahesh Shetty authored and Mahesh Shetty committed Nov 14, 2024
1 parent 4e0b0ee commit bc00e7c
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 141 deletions.
207 changes: 125 additions & 82 deletions ocs_ci/helpers/stretchcluster_helper.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,45 @@
import re
import logging

from ocs_ci.ocs.utils import retry
from ocs_ci.helpers.helpers import (
modify_deployment_replica_count,
modify_statefulset_replica_count,
modify_job_parallelism_count,
)
from ocs_ci.ocs.exceptions import CommandFailed
from ocs_ci.ocs.constants import LOGREADER_CEPHFS_LABEL, LOGWRITER_RBD_LABEL
from ocs_ci.ocs.exceptions import CommandFailed, UnexpectedBehaviour
from ocs_ci.ocs.resources.pod import (
wait_for_pods_deletion,
get_not_running_pods,
)
from ocs_ci.ocs import constants
from ocs_ci.ocs.ocp import OCP


logger = logging.getLogger(__name__)


def recover_workload_pods_post_recovery(sc_obj, pods_not_running):
def check_for_logwriter_workload_pods(
sc_obj,
):
try:
sc_obj.get_logwriter_reader_pods(label=constants.LOGWRITER_CEPHFS_LABEL)
sc_obj.get_logwriter_reader_pods(
label=constants.LOGREADER_CEPHFS_LABEL,
statuses=[constants.STATUS_RUNNING, constants.STATUS_COMPLETED],
)
sc_obj.get_logwriter_reader_pods(
label=constants.LOGWRITER_RBD_LABEL, exp_num_replicas=2
)
except UnexpectedBehaviour:
logger.info("some pods are not running, so trying the work-around")
recover_workload_pods_post_recovery(sc_obj)
logger.info("All the workloads pods are successfully up and running")


@retry(UnexpectedBehaviour, tries=5, delay=10, backoff=1)
def recover_workload_pods_post_recovery(sc_obj, pods_not_running=None):

"""
There seems to be a known issue https://bugzilla.redhat.com/show_bug.cgi?id=2244353
Expand All @@ -28,6 +51,13 @@ def recover_workload_pods_post_recovery(sc_obj, pods_not_running):
pods_not_running (List): A list of Pod objects that are not in Running state
"""

# fetch the not running pods
if not pods_not_running:
pods_not_running = get_not_running_pods(
namespace=constants.STRETCH_CLUSTER_NAMESPACE
)

# try to scale down and scale up the deployment/sts
# if any of the mentioned errors are found
error_messages = [
Expand All @@ -45,116 +75,129 @@ def check_errors_regex(desc_out, err_msgs):
return bool(re.search(pattern, desc_out))

pod_names = [pod.name for pod in pods_not_running]
logger.info(f"Pods not running: {pod_names}")
logger.info(f"These are the pods not running: {pod_names}")
scaled_down = []

for pod in pods_not_running:
# get the labels from the pod data
labels = str(pod.get_labels())

# make sure these pods are not already scaled down
if any(
[
constants.LOGWRITER_CEPHFS_LABEL.split("=")[1] in labels
and constants.LOGWRITER_CEPHFS_LABEL in scaled_down,
constants.LOGWRITER_RBD_LABEL.split("=")[1] in labels
and constants.LOGWRITER_RBD_LABEL in scaled_down,
constants.LOGREADER_CEPHFS_LABEL.split("=")[1] in labels
and constants.LOGREADER_CEPHFS_LABEL in scaled_down,
]
):
continue

# get the pod describe output
try:
desc_out = OCP().exec_oc_cmd(
command=f"describe pod {pod.name}", out_yaml_format=False
)
except CommandFailed as e:
if "NotFound" not in e.args[0]:
raise e
else:
continue

# if any of the above mentioned error messages are present in the
# describe outpout we scaled down respective deployment/job/statefulset
if check_errors_regex(desc_out, error_messages):
# Delete the ContainerStatusUnknown error pods
if pod.status() == constants.STATUS_CONTAINER_STATUS_UNKNOWN:
pod.delete()
workload_labels = [
constants.LOGWRITER_CEPHFS_LABEL,
LOGREADER_CEPHFS_LABEL,
LOGWRITER_RBD_LABEL,
]

if (
constants.LOGWRITER_CEPHFS_LABEL.split("=")[1] in labels
and constants.LOGWRITER_CEPHFS_LABEL not in scaled_down
):
modify_deployment_replica_count(
deployment_name=constants.LOGWRITER_CEPHFS_NAME,
replica_count=0,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
wait_for_pods_deletion(
constants.LOGWRITER_CEPHFS_LABEL,
timeout=300,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
scaled_down.append(constants.LOGWRITER_CEPHFS_LABEL)
elif (
constants.LOGWRITER_RBD_LABEL.split("=")[1] in labels
and constants.LOGWRITER_RBD_LABEL not in scaled_down
):

modify_statefulset_replica_count(
statefulset_name=constants.LOGWRITER_RBD_NAME,
replica_count=0,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
wait_for_pods_deletion(
constants.LOGWRITER_RBD_LABEL,
timeout=300,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
for app_label in workload_labels:
for pod in pods_not_running:

# Delete any pod that is in Error or ContainerStatusUnknown status
if pod.status() in [
constants.STATUS_CONTAINER_STATUS_UNKNOWN,
constants.STATUS_ERROR,
]:
logger.info(
f"Pod {pod.name} in either {constants.STATUS_CONTAINER_STATUS_UNKNOWN} "
f"or {constants.STATUS_ERROR}. hence deleting the pod"
)
scaled_down.append(constants.LOGWRITER_RBD_LABEL)

elif (
constants.LOGREADER_CEPHFS_LABEL.split("=")[1] in labels
and constants.LOGREADER_CEPHFS_LABEL not in scaled_down
):
pod.delete()
continue

modify_job_parallelism_count(
job_name=constants.LOGREADER_CEPHFS_NAME,
count=0,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
wait_for_pods_deletion(
constants.LOGREADER_CEPHFS_LABEL,
timeout=300,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
# Get the pod describe output to verify the error
try:
logger.info(f"Fetching the `oc describe` output for pod {pod.name}")
desc_out = OCP().exec_oc_cmd(
command=f"describe pod {pod.name}", out_yaml_format=False
)
scaled_down.append(constants.LOGREADER_CEPHFS_LABEL)
except CommandFailed as e:
if "NotFound" in e.args[0]:
raise UnexpectedBehaviour
else:
raise e

# checks for errors in the pod describe output
if check_errors_regex(desc_out, error_messages):

if (
app_label.split("=")[1] in str(pod.get_labels())
and app_label == constants.LOGWRITER_CEPHFS_LABEL
):

logger.info("Scaling down the deployment for logwriter")
modify_deployment_replica_count(
deployment_name=constants.LOGWRITER_CEPHFS_NAME,
replica_count=0,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
wait_for_pods_deletion(
constants.LOGWRITER_CEPHFS_LABEL,
timeout=300,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
scaled_down.append(constants.LOGWRITER_CEPHFS_LABEL)
break

elif (
app_label.split("=")[1] in str(pod.get_labels())
and app_label == constants.LOGREADER_CEPHFS_LABEL
):

logger.info("Scaling down the job for logreader")
modify_job_parallelism_count(
job_name=constants.LOGREADER_CEPHFS_NAME,
count=0,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
wait_for_pods_deletion(
constants.LOGREADER_CEPHFS_LABEL,
timeout=300,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
scaled_down.append(constants.LOGREADER_CEPHFS_LABEL)
break

elif (
app_label.split("=")[1] in str(pod.get_labels())
and app_label == constants.LOGWRITER_RBD_LABEL
):

logger.info("Scaling down logwriter rbd statefulset")
modify_statefulset_replica_count(
statefulset_name=constants.LOGWRITER_RBD_NAME,
replica_count=0,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
wait_for_pods_deletion(
constants.LOGWRITER_RBD_LABEL,
timeout=300,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
scaled_down.append(constants.LOGWRITER_RBD_LABEL)
break

# for all the scaled down workloads we scale them up
# one by one
for label in scaled_down:
if label == constants.LOGWRITER_CEPHFS_LABEL:
logger.info("Scaling up logwriter deployment now")
modify_deployment_replica_count(
deployment_name=constants.LOGWRITER_CEPHFS_NAME,
replica_count=4,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
elif label == constants.LOGWRITER_RBD_LABEL:
logger.info("Scaling up logwriter rbd statefulset now")
modify_statefulset_replica_count(
statefulset_name=constants.LOGWRITER_RBD_NAME,
replica_count=2,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)
elif label == constants.LOGREADER_CEPHFS_LABEL:
logger.info("Scaling up logwriter job now")
modify_job_parallelism_count(
job_name=constants.LOGREADER_CEPHFS_NAME,
count=4,
namespace=constants.STRETCH_CLUSTER_NAMESPACE,
)

# fetch workload pod details now and make sure all of them are running
logger.info("Checking if the logwriter pods are up and running now")
sc_obj.get_logwriter_reader_pods(label=constants.LOGWRITER_CEPHFS_LABEL)
sc_obj.get_logwriter_reader_pods(
label=constants.LOGREADER_CEPHFS_LABEL, statuses=["Running", "Completed"]
Expand Down
33 changes: 4 additions & 29 deletions tests/functional/disaster-recovery/sc_arbiter/test_netsplit.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
stretchcluster_required,
)
from ocs_ci.helpers.stretchcluster_helper import (
recover_workload_pods_post_recovery,
check_for_logwriter_workload_pods,
recover_from_ceph_stuck,
)
from ocs_ci.ocs.exceptions import UnexpectedBehaviour

from ocs_ci.ocs.resources.stretchcluster import StretchCluster
from ocs_ci.ocs.exceptions import CephHealthException
Expand All @@ -25,7 +24,6 @@
from ocs_ci.ocs.resources.pod import (
wait_for_pods_to_be_in_statuses,
get_ceph_tools_pod,
get_not_running_pods,
)

logger = logging.getLogger(__name__)
Expand All @@ -35,29 +33,6 @@
@stretchcluster_required
@turquoise_squad
class TestNetSplit:
def check_for_logwriter_workload_pods(
self,
sc_obj,
):

try:
sc_obj.get_logwriter_reader_pods(label=constants.LOGWRITER_CEPHFS_LABEL)
sc_obj.get_logwriter_reader_pods(
label=constants.LOGREADER_CEPHFS_LABEL,
statuses=[constants.STATUS_RUNNING, constants.STATUS_COMPLETED],
)
sc_obj.get_logwriter_reader_pods(
label=constants.LOGWRITER_RBD_LABEL, exp_num_replicas=2
)
except UnexpectedBehaviour:

logger.info("some pods are not running, so trying the work-around")
pods_not_running = get_not_running_pods(
namespace=constants.STRETCH_CLUSTER_NAMESPACE
)
recover_workload_pods_post_recovery(sc_obj, pods_not_running)
logger.info("All the workloads pods are successfully up and running")

@pytest.fixture()
def init_sanity(self, request):
"""
Expand Down Expand Up @@ -86,7 +61,7 @@ def finalizer():
argvalues=[
pytest.param(
constants.NETSPLIT_DATA_1_DATA_2,
30,
15,
marks=[
pytest.mark.polarion_id("OCS-5069"),
pytest.mark.polarion_id("OCS-5071"),
Expand Down Expand Up @@ -177,7 +152,7 @@ def test_netsplit(
md5sum_before = vm_obj.run_ssh_cmd(command="md5sum /file_1.txt")

# note all the pod names
self.check_for_logwriter_workload_pods(sc_obj)
check_for_logwriter_workload_pods(sc_obj)

# note the file names created and each file start write time
# note the file names created
Expand Down Expand Up @@ -248,7 +223,7 @@ def test_netsplit(
sc_obj.post_failure_checks(start_time, end_time, wait_for_read_completion=False)

# check for any data loss
self.check_for_logwriter_workload_pods(sc_obj)
check_for_logwriter_workload_pods(sc_obj)
assert sc_obj.check_for_data_loss(
constants.LOGWRITER_CEPHFS_LABEL
), "[CephFS] Data is lost"
Expand Down
Loading

0 comments on commit bc00e7c

Please sign in to comment.