Implement controller publish/transfer logic + node mount/unmount

This commit is contained in:
Garrett Mills 2024-10-01 22:58:35 -04:00
parent b9f2259674
commit beee5a441d
4 changed files with 112 additions and 11 deletions

View File

@ -122,13 +122,26 @@ func (d *controllerService) ControllerGetCapabilities(ctx context.Context, reque
// ControllerPublishVolume publish a volume
func (d *controllerService) ControllerPublishVolume(ctx context.Context, request *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
klog.Infof("controller.ControllerPublishVolume: %s on node %s", request.GetVolumeId(), request.GetNodeId())
return nil, status.Error(codes.Unimplemented, "controller.ControllerPublishVolume")
vol, err := d.p5x.GetVolumeByName(request.GetVolumeId())
if err != nil {
return nil, err
}
_, err = d.p5x.TransferVolume(vol, request.GetNodeId())
if err != nil {
return nil, err
}
return &csi.ControllerPublishVolumeResponse{}, nil
}
// ControllerUnpublishVolume unpublish a volume
func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
klog.Infof("controller.ControllerUnpublishVolume: %s on node %s", request.GetVolumeId(), request.GetNodeId())
return nil, status.Error(codes.Unimplemented, "controller.ControllerUnpublishVolume")
// No need to "unpublish" a volume currently -- it will be transferred to the correct node next time it's published
return &csi.ControllerUnpublishVolumeResponse{}, nil
}
// ValidateVolumeCapabilities validate volume capabilities

View File

@ -61,7 +61,7 @@ func NewDriver(endpoint string, nodeID string, p5xEndpoint string, p5xToken stri
return &Driver{
endpoint: endpoint,
controllerService: newControllerService(p5x),
nodeService: newNodeService(nodeID),
nodeService: newNodeService(nodeID, p5x),
P5x: p5x,
}
}

View File

@ -41,14 +41,16 @@ var (
type nodeService struct {
nodeID string
p5x *p5xApi
csi.UnimplementedNodeServer
}
var _ csi.NodeServer = &nodeService{}
func newNodeService(nodeID string) nodeService {
func newNodeService(nodeID string, p5x *p5xApi) nodeService {
return nodeService{
nodeID: nodeID,
p5x: p5x,
}
}
@ -101,13 +103,21 @@ func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePu
}
if readOnly {
// Todo add readonly in your mount options
options["mountReadOnly"] = "true"
}
// TODO modify your volume mount logic here
vol, err := n.p5x.GetVolumeByName(request.GetVolumeId())
if err != nil {
return nil, err
}
return nil, status.Error(codes.Unimplemented, "node.NodePublishVolume")
// return &csi.NodePublishVolumeResponse{}, nil
_, err = n.p5x.MountVolume(vol, request.GetTargetPath(), options)
if err != nil {
return nil, err
}
klog.Infof("node.NodePublishVolume: Successfully published volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unmount the volume from the target path
@ -119,10 +129,18 @@ func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.Node
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
// TODO modify your volume umount logic here
vol, err := n.p5x.GetVolumeByName(request.GetVolumeId())
if err != nil {
return nil, err
}
return nil, status.Error(codes.Unimplemented, "node.NodeUnpublishVolume")
// return &csi.NodeUnpublishVolumeResponse{}, nil
_, err = n.p5x.UnmountVolume(vol)
if err != nil {
return nil, err
}
klog.Infof("node.NodeUnpublishVolume: Successfully unpublished volume %s -> %s", request.GetVolumeId(), request.GetTargetPath())
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetVolumeStats get the volume stats

View File

@ -75,6 +75,76 @@ func (p5x *p5xApi) DeleteVolume(volume *p5xVolume) error {
return nil
}
func (p5x *p5xApi) TransferVolume(volume *p5xVolume, nodeName string) (*p5xVolume, error) {
klog.Info("p5x.TransferVolume: ", volume, " | to node: ", nodeName)
route := fmt.Sprintf("volumes/%s/transfer-to/%s", volume.Name, nodeName)
resBody, err := p5x.MakeRequest(http.MethodPost, route, []byte(`{}`))
if err != nil {
return nil, err
}
vol := &p5xVolume{}
err = json.Unmarshal(resBody, vol)
if err != nil {
return nil, err
}
klog.Info("p5x.TransferVolume: Successfully transferred volume:", volume.Name, vol)
return vol, nil
}
type MountVolumeRequest struct {
Mountpoint string `json:"mountpoint"`
Options map[string]string `json:"options"`
}
func (p5x *p5xApi) MountVolume(volume *p5xVolume, mountpoint string, options map[string]string) (*p5xVolume, error) {
klog.Info("p5x.MountVolume: ", volume, mountpoint, options)
req := &MountVolumeRequest{
Mountpoint: mountpoint,
Options: options,
}
body, err := json.Marshal(req)
if err != nil {
return nil, err
}
route := fmt.Sprintf("volumes/%s/mount", volume.Name)
resBody, err := p5x.MakeRequest(http.MethodPost, route, body)
if err != nil {
return nil, err
}
vol := &p5xVolume{}
err = json.Unmarshal(resBody, vol)
if err != nil {
return nil, err
}
klog.Info("p5x.MountVolume: Successfully mounted volume: ", volume.Name, mountpoint)
return vol, nil
}
func (p5x *p5xApi) UnmountVolume(volume *p5xVolume) (*p5xVolume, error) {
klog.Info("p5x.UnmountVolume: ", volume)
route := fmt.Sprintf("volumes/%s/unmount", volume.Name)
resBody, err := p5x.MakeRequest(http.MethodPost, route, []byte(`{}`))
if err != nil {
return nil, err
}
vol := &p5xVolume{}
err = json.Unmarshal(resBody, vol)
if err != nil {
return nil, err
}
klog.Info("p5x.UnmountVolume: Successfully unmounted volume: ", volume.Name)
return vol, nil
}
func (p5x *p5xApi) MakeRequest(method string, route string, body []byte) ([]byte, error) {
url := fmt.Sprintf("%s:%d/api/v1/%s", p5x.endpoint, p5x.port, route)
klog.Infof("p5x.MakeRequest: [%s] %s", method, url)