csi-driver/pkg/csi/node.go

206 lines
6.8 KiB
Go
Raw Normal View History

2024-09-28 05:44:56 +00:00
/*
Copyright 2024 p5x.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"errors"
"k8s.io/klog"
2024-09-28 05:44:56 +00:00
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
volumeCaps = []csi.VolumeCapability_AccessMode{
{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
},
}
)
type nodeService struct {
nodeID string
p5x *p5xApi
2024-09-28 05:44:56 +00:00
csi.UnimplementedNodeServer
}
var _ csi.NodeServer = &nodeService{}
func newNodeService(nodeID string, p5x *p5xApi) nodeService {
2024-09-28 05:44:56 +00:00
return nodeService{
nodeID: nodeID,
p5x: p5x,
2024-09-28 05:44:56 +00:00
}
}
// NodeStageVolume is called by the CO when a workload that wants to use the specified volume is placed (scheduled) on a node.
func (n *nodeService) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
klog.Infof("node.NodeStageVolume: Staging volume %s -> %s", request.GetVolumeId(), request.GetStagingTargetPath())
return nil, status.Error(codes.Unimplemented, "node.NodeStageVolume")
2024-09-28 05:44:56 +00:00
}
// NodeUnstageVolume is called by the CO when a workload that was using the specified volume is being moved to a different node.
func (n *nodeService) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
klog.Infof("node.NodeUnstageVolume: Staging volume %s -> %s", request.GetVolumeId(), request.GetStagingTargetPath())
return nil, status.Error(codes.Unimplemented, "node.NodeUnstageVolume")
2024-09-28 05:44:56 +00:00
}
// NodePublishVolume mounts the volume on the node.
func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
klog.Infof("node.NodePublishVolume: Publishing volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
2024-09-28 05:44:56 +00:00
volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume id not provided")
}
target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
volCap := request.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}
if !isValidVolumeCapabilities([]*csi.VolumeCapability{volCap}) {
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}
readOnly := false
if request.GetReadonly() || request.VolumeCapability.AccessMode.GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
readOnly = true
}
options := make(map[string]string)
if m := volCap.GetMount(); m != nil {
for _, f := range m.MountFlags {
// get mountOptions from PV.spec.mountOptions
options[f] = ""
}
}
if readOnly {
options["mountReadOnly"] = "true"
2024-09-28 05:44:56 +00:00
}
vol, err := n.p5x.GetVolumeByName(request.GetVolumeId())
if err != nil {
return nil, err
}
_, err = n.p5x.MountVolume(vol, request.GetTargetPath(), options)
if err != nil {
return nil, err
}
2024-09-28 05:44:56 +00:00
klog.Infof("node.NodePublishVolume: Successfully published volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
return &csi.NodePublishVolumeResponse{}, nil
2024-09-28 05:44:56 +00:00
}
// NodeUnpublishVolume unmount the volume from the target path
func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
klog.Infof("node.NodeUnpublishVolume: Unpublishing volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
2024-09-28 05:44:56 +00:00
target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
vol, err := n.p5x.GetVolumeByName(request.GetVolumeId())
if err != nil {
if errors.Is(err, ErrVolumeNotFound) {
klog.Infof("node.NodeUnpublishVolume: Could not find volume %s - assuming it has already been deleted", request.GetVolumeId())
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, err
}
_, err = n.p5x.UnmountVolume(vol)
if err != nil {
return nil, err
}
2024-09-28 05:44:56 +00:00
klog.Infof("node.NodeUnpublishVolume: Successfully unpublished volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
return &csi.NodeUnpublishVolumeResponse{}, nil
2024-09-28 05:44:56 +00:00
}
// NodeGetVolumeStats get the volume stats
func (n *nodeService) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
klog.Infof("node.NodeGetVolumeStats: For volume %s -> %s", request.GetVolumeId(), request.GetVolumePath())
return nil, status.Error(codes.Unimplemented, "node.NodeGetVolumeStats")
2024-09-28 05:44:56 +00:00
}
// NodeExpandVolume expand the volume
func (n *nodeService) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
klog.Infof("node.NodeExpandVolume: %s -> %s (min bytes: %d)", request.GetVolumeId(), request.GetVolumePath(), request.GetCapacityRange().GetRequiredBytes())
return nil, status.Error(codes.Unimplemented, "node.NodeExpandVolume")
2024-09-28 05:44:56 +00:00
}
// NodeGetCapabilities get the node capabilities
func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
klog.Infof("node.NodeGetCapabilities: Called")
2024-09-28 05:44:56 +00:00
return &csi.NodeGetCapabilitiesResponse{}, nil
}
// NodeGetInfo get the node info
func (n *nodeService) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
klog.Infof("node.NodeGetInfo: Called")
2024-09-28 05:44:56 +00:00
return &csi.NodeGetInfoResponse{NodeId: n.nodeID}, nil
}
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
hasSupport := func(cap *csi.VolumeCapability) bool {
if csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER == cap.AccessMode.GetMode() {
return true
}
if csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER == cap.AccessMode.GetMode() {
return true
}
if csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY == cap.AccessMode.GetMode() {
return true
}
// for _, c := range volumeCaps {
// if c.GetMode() == cap.AccessMode.GetMode() {
// return true
// }
// }
return false
}
foundAll := true
for _, c := range volCaps {
if !hasSupport(c) {
foundAll = false
}
}
return foundAll
}