Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Premature stream termination is not propagated #66

Open
carlbraganza opened this issue Nov 12, 2024 · 1 comment · May be fixed by #86
Open

Premature stream termination is not propagated #66

carlbraganza opened this issue Nov 12, 2024 · 1 comment · May be fixed by #86
Assignees

Comments

@carlbraganza
Copy link
Contributor

carlbraganza commented Nov 12, 2024

What happened:
Premature termination of the gRPC stream (either due to client cancellation or sidecar detected error) must be propagated to the CSI driver by cancelling the context used to create the stream.

For example, we have logic that looks like this:

func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stream api.SnapshotMetadata_GetMetadataAllocatedServer) error {
        ...
        ctx := stream.Context()
	csiStream, err := csi.NewSnapshotMetadataClient(s.csiConnection()).GetMetadataAllocated(ctx, csiReq)
	for {
		csiResp, err := csiStream.Recv()

		if err := clientStream.Send(clientResp); err != nil {
			return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err)
		}
	}
}

A failure to send to the client unwinds in the sidecar, but does not cancel the downstream CSI driver - it would block on its Send call until it times out (if it had set a timeout).

What you expected to happen:

We should be doing the equivalent of this:

func (s *Server) GetMetadataAllocated(req *api.GetMetadataAllocatedRequest, stream api.SnapshotMetadata_GetMetadataAllocatedServer) error {
        ...
        ctx, cancelFn := context.WithCancel(stream.Context())
        defer cancelFn()

	csiStream, err := csi.NewSnapshotMetadataClient(s.csiConnection()).GetMetadataAllocated(ctx, csiReq)
	for {
		csiResp, err := csiStream.Recv()

		if err := clientStream.Send(clientResp); err != nil {
			return s.statusPassOrWrapError(err, codes.Internal, msgInternalFailedToSendResponseFmt, err)
		}
	}
}

Here the deferred cancel function would force cancellation of the CSI driver's stream either when a local error is detected or if the sidecar's client cancelled its stream.

How to reproduce it:

Anything else we need to know?:

Environment:

  • Driver version:
  • Kubernetes version (use kubectl version):
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:
@carlbraganza
Copy link
Contributor Author

/assign @carlbraganza

@carlbraganza carlbraganza linked a pull request Dec 11, 2024 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant