[WIP] More work implementing create/delete volume and publish/unpublish volume

This commit is contained in:
2024-09-29 10:37:55 -04:00
parent c98b421b03
commit b9f2259674
17 changed files with 225 additions and 85 deletions

View File

@@ -18,6 +18,7 @@ package csi
import (
"context"
"k8s.io/klog"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
@@ -27,6 +28,7 @@ import (
var (
controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_PUBLISH_UNPUBLISH_VOLUME,
}
)
@@ -45,6 +47,7 @@ func newControllerService(p5x *p5xApi) controllerService {
// CreateVolume creates a volume
func (d *controllerService) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
klog.Infof("controller.CreateVolume: %s (size: %d)", request.Name, request.CapacityRange.GetRequiredBytes())
if len(request.Name) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
}
@@ -64,6 +67,7 @@ func (d *controllerService) CreateVolume(ctx context.Context, request *csi.Creat
CapacityBytes: vol.SizeInBytes,
}
klog.Info("controller.CreateVolume: Created volume: ", vol)
return &csi.CreateVolumeResponse{Volume: &csiVol}, nil
// volCtx := make(map[string]string)
@@ -82,7 +86,9 @@ func (d *controllerService) CreateVolume(ctx context.Context, request *csi.Creat
// DeleteVolume deletes a volume
func (d *controllerService) DeleteVolume(ctx context.Context, request *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
vol, err := d.p5x.GetVolumeByName(request.VolumeId)
klog.Infof("controller.DeleteVolume: %s", request.GetVolumeId())
vol, err := d.p5x.GetVolumeByName(request.GetVolumeId())
if err != nil {
return nil, err
}
@@ -92,11 +98,13 @@ func (d *controllerService) DeleteVolume(ctx context.Context, request *csi.Delet
return nil, err
}
klog.Info("controller.DeleteVolume: Successfully deleted volume: ", vol)
return &csi.DeleteVolumeResponse{}, nil
}
// ControllerGetCapabilities get controller capabilities
func (d *controllerService) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
klog.Info("controller.ControllerGetCapabilities: called")
var caps []*csi.ControllerServiceCapability
for _, cap := range controllerCaps {
c := &csi.ControllerServiceCapability{
@@ -113,55 +121,66 @@ func (d *controllerService) ControllerGetCapabilities(ctx context.Context, reque
// ControllerPublishVolume publish a volume
func (d *controllerService) ControllerPublishVolume(ctx context.Context, request *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ControllerPublishVolume: %s on node %s", request.GetVolumeId(), request.GetNodeId())
return nil, status.Error(codes.Unimplemented, "controller.ControllerPublishVolume")
}
// ControllerUnpublishVolume unpublish a volume
func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ControllerUnpublishVolume: %s on node %s", request.GetVolumeId(), request.GetNodeId())
return nil, status.Error(codes.Unimplemented, "controller.ControllerUnpublishVolume")
}
// ValidateVolumeCapabilities validate volume capabilities
func (d *controllerService) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ValidateVolumeCapabilities: %s", request.GetVolumeId())
return nil, status.Error(codes.Unimplemented, "controller.ValidateVolumeCapabilities")
}
// ListVolumes list volumes
func (d *controllerService) ListVolumes(ctx context.Context, request *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ListVolumes: called")
return nil, status.Error(codes.Unimplemented, "controller.ListVolumes")
}
// GetCapacity get capacity
func (d *controllerService) GetCapacity(ctx context.Context, request *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.GetCapacity: called")
return nil, status.Error(codes.Unimplemented, "controller.GetCapacity")
}
// CreateSnapshot create a snapshot
func (d *controllerService) CreateSnapshot(ctx context.Context, request *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.CreateSnapshot: called")
return nil, status.Error(codes.Unimplemented, "controller.CreateSnapshot")
}
// DeleteSnapshot delete a snapshot
func (d *controllerService) DeleteSnapshot(ctx context.Context, request *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.DeleteSnapshot: called")
return nil, status.Error(codes.Unimplemented, "controller.DeleteSnapshot")
}
// ListSnapshots list snapshots
func (d *controllerService) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ListSnapshots: called")
return nil, status.Error(codes.Unimplemented, "controller.ListSnapshots")
}
// ControllerExpandVolume expand a volume
func (d *controllerService) ControllerExpandVolume(ctx context.Context, request *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ControllerExpandVolume: %s (size: %d)", request.GetVolumeId(), request.GetCapacityRange().GetRequiredBytes())
return nil, status.Error(codes.Unimplemented, "controller.ControllerExpandVolume")
}
// ControllerGetVolume get a volume
func (d *controllerService) ControllerGetVolume(ctx context.Context, request *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ControllerGetVolume: %s", request.GetVolumeId())
return nil, status.Error(codes.Unimplemented, "controller.ControllerGetVolume")
}
// ControllerModifyVolume modify a volume
func (d *controllerService) ControllerModifyVolume(ctx context.Context, request *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("controller.ControllerModifyVolume: %s", request.GetVolumeId())
return nil, status.Error(codes.Unimplemented, "controller.ControllerModifyVolume")
}

View File

@@ -50,7 +50,7 @@ type Driver struct {
// NewDriver creates a new driver
func NewDriver(endpoint string, nodeID string, p5xEndpoint string, p5xToken string, p5xPort int64) *Driver {
klog.Infof("Driver: %v version %v commit %v date %v", DriverName, driverVersion, gitCommit, buildDate)
klog.Infof("Driver: %v | Version: %v | Commit: %v | Date: %v", DriverName, driverVersion, gitCommit, buildDate)
p5x := &p5xApi{
endpoint: p5xEndpoint,

View File

@@ -18,6 +18,7 @@ package csi
import (
"context"
"k8s.io/klog"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
@@ -53,16 +54,20 @@ func newNodeService(nodeID string) nodeService {
// NodeStageVolume is called by the CO when a workload that wants to use the specified volume is placed (scheduled) on a node.
func (n *nodeService) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("node.NodeStageVolume: Staging volume %s -> %s", request.GetVolumeId(), request.GetStagingTargetPath())
return nil, status.Error(codes.Unimplemented, "node.NodeStageVolume")
}
// NodeUnstageVolume is called by the CO when a workload that was using the specified volume is being moved to a different node.
func (n *nodeService) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("node.NodeUnstageVolume: Staging volume %s -> %s", request.GetVolumeId(), request.GetStagingTargetPath())
return nil, status.Error(codes.Unimplemented, "node.NodeUnstageVolume")
}
// NodePublishVolume mounts the volume on the node.
func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
klog.Infof("node.NodePublishVolume: Publishing volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume id not provided")
@@ -101,11 +106,14 @@ func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePu
// TODO modify your volume mount logic here
return &csi.NodePublishVolumeResponse{}, nil
return nil, status.Error(codes.Unimplemented, "node.NodePublishVolume")
// return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unmount the volume from the target path
func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
klog.Infof("node.NodeUnpublishVolume: Unpublishing volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
@@ -113,26 +121,31 @@ func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.Node
// TODO modify your volume umount logic here
return &csi.NodeUnpublishVolumeResponse{}, nil
return nil, status.Error(codes.Unimplemented, "node.NodeUnpublishVolume")
// return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetVolumeStats get the volume stats
func (n *nodeService) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("node.NodeGetVolumeStats: For volume %s -> %s", request.GetVolumeId(), request.GetVolumePath())
return nil, status.Error(codes.Unimplemented, "node.NodeGetVolumeStats")
}
// NodeExpandVolume expand the volume
func (n *nodeService) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
klog.Infof("node.NodeExpandVolume: %s -> %s (min bytes: %d)", request.GetVolumeId(), request.GetVolumePath(), request.GetCapacityRange().GetRequiredBytes())
return nil, status.Error(codes.Unimplemented, "node.NodeExpandVolume")
}
// NodeGetCapabilities get the node capabilities
func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
klog.Infof("node.NodeGetCapabilities: Called")
return &csi.NodeGetCapabilitiesResponse{}, nil
}
// NodeGetInfo get the node info
func (n *nodeService) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
klog.Infof("node.NodeGetInfo: Called")
return &csi.NodeGetInfoResponse{NodeId: n.nodeID}, nil
}

View File

@@ -4,8 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"k8s.io/klog"
"net/http"
@@ -29,6 +27,7 @@ func (p5x *p5xApi) CreateVolume(name string, sizeInBytes int64) (*p5xVolume, err
Name: name,
SizeInBytes: sizeInBytes,
}
klog.Info("p5x.CreateVolume: Attempting to create: ", vol)
body, err := json.Marshal(vol)
if err != nil {
@@ -41,47 +40,67 @@ func (p5x *p5xApi) CreateVolume(name string, sizeInBytes int64) (*p5xVolume, err
}
err = json.Unmarshal(resBody, vol)
klog.Info("Successfully created volume: ", vol)
klog.Info("p5x.CreateVolume: Successfully created volume: ", vol)
return vol, nil
}
func (p5x *p5xApi) GetVolumeByName(name string) (*p5xVolume, error) {
return nil, status.Error(codes.Unimplemented, "")
}
klog.Infof("p5x.GetVolumeByName: %s", name)
route := fmt.Sprintf("volumes/%s", name)
resBody, err := p5x.MakeRequest(http.MethodGet, route, nil)
if err != nil {
return nil, err
}
func (p5x *p5xApi) GetVolumeById(volumeId int64) (*p5xVolume, error) {
return nil, status.Error(codes.Unimplemented, "")
vol := &p5xVolume{}
err = json.Unmarshal(resBody, vol)
if err != nil {
return nil, err
}
klog.Info("p5x.GetVolumeByName: Retrieved volume: ", vol)
return vol, nil
}
func (p5x *p5xApi) DeleteVolume(volume *p5xVolume) error {
klog.Info("p5x.DeleteVolume: ", volume)
route := fmt.Sprintf("volumes/%s", volume.Name)
resBody, err := p5x.MakeRequest(http.MethodDelete, route, []byte(`{}`))
if err != nil {
return err
}
klog.Infof("Successfully deleted volume %s: %s", volume.Name, resBody)
klog.Infof("p5x.DeleteVolume: Successfully deleted volume %s: %s", volume.Name, resBody)
return nil
}
func (p5x *p5xApi) MakeRequest(method string, route string, body []byte) ([]byte, error) {
bodyReader := bytes.NewReader(body)
url := fmt.Sprintf("%s:%d/api/v1/%s", p5x.endpoint, p5x.port, route)
klog.Infof("p5x.MakeRequest: [%s] %s", method, url)
klog.Infof("p5x.MakeRequest: %s", body)
req, err := http.NewRequest(method, url, bodyReader)
if err != nil {
klog.Errorf("p5x.MakeRequest: could not create request: %s\n", err)
return nil, err
var res *http.Response
var err error
if method == http.MethodGet {
res, err = http.Get(url)
if err != nil {
return nil, err
}
} else {
klog.Infof("p5x.MakeRequest: body: %s", body)
bodyReader := bytes.NewReader(body)
req, err2 := http.NewRequest(method, url, bodyReader)
if err2 != nil {
klog.Errorf("p5x.MakeRequest: could not create request: %s\n", err)
return nil, err2
}
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p5x.token))
req.Header.Set("Content-Type", "application/json")
res, err = http.DefaultClient.Do(req)
}
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p5x.token))
req.Header.Set("Content-Type", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
klog.Errorf("p5x.MakeRequest: error executing request: %s\n", err)
return nil, err

View File

@@ -24,9 +24,9 @@ import (
// These are set during build time via -ldflags
var (
driverVersion string
gitCommit string
buildDate string
driverVersion string = "v0.0.1"
gitCommit string = "master"
buildDate string = "2024-09-28"
)
// VersionInfo struct