diff --git a/pkg/csi/controller.go b/pkg/csi/controller.go index a173ea3..ae8609e 100644 --- a/pkg/csi/controller.go +++ b/pkg/csi/controller.go @@ -122,13 +122,26 @@ func (d *controllerService) ControllerGetCapabilities(ctx context.Context, reque // ControllerPublishVolume publish a volume func (d *controllerService) ControllerPublishVolume(ctx context.Context, request *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { klog.Infof("controller.ControllerPublishVolume: %s on node %s", request.GetVolumeId(), request.GetNodeId()) - return nil, status.Error(codes.Unimplemented, "controller.ControllerPublishVolume") + + vol, err := d.p5x.GetVolumeByName(request.GetVolumeId()) + if err != nil { + return nil, err + } + + _, err = d.p5x.TransferVolume(vol, request.GetNodeId()) + if err != nil { + return nil, err + } + + return &csi.ControllerPublishVolumeResponse{}, nil } // ControllerUnpublishVolume unpublish a volume func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { klog.Infof("controller.ControllerUnpublishVolume: %s on node %s", request.GetVolumeId(), request.GetNodeId()) - return nil, status.Error(codes.Unimplemented, "controller.ControllerUnpublishVolume") + + // No need to "unpublish" a volume currently -- it will be transferred to the correct node next time it's published + return &csi.ControllerUnpublishVolumeResponse{}, nil } // ValidateVolumeCapabilities validate volume capabilities diff --git a/pkg/csi/driver.go b/pkg/csi/driver.go index a641477..8870b16 100644 --- a/pkg/csi/driver.go +++ b/pkg/csi/driver.go @@ -61,7 +61,7 @@ func NewDriver(endpoint string, nodeID string, p5xEndpoint string, p5xToken stri return &Driver{ endpoint: endpoint, controllerService: newControllerService(p5x), - nodeService: newNodeService(nodeID), + nodeService: newNodeService(nodeID, p5x), P5x: p5x, } } diff --git a/pkg/csi/node.go b/pkg/csi/node.go index c5123d3..8ae84f0 100644 --- a/pkg/csi/node.go +++ b/pkg/csi/node.go @@ -41,14 +41,16 @@ var ( type nodeService struct { nodeID string + p5x *p5xApi csi.UnimplementedNodeServer } var _ csi.NodeServer = &nodeService{} -func newNodeService(nodeID string) nodeService { +func newNodeService(nodeID string, p5x *p5xApi) nodeService { return nodeService{ nodeID: nodeID, + p5x: p5x, } } @@ -101,13 +103,21 @@ func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePu } if readOnly { - // Todo add readonly in your mount options + options["mountReadOnly"] = "true" } - // TODO modify your volume mount logic here + vol, err := n.p5x.GetVolumeByName(request.GetVolumeId()) + if err != nil { + return nil, err + } - return nil, status.Error(codes.Unimplemented, "node.NodePublishVolume") - // return &csi.NodePublishVolumeResponse{}, nil + _, err = n.p5x.MountVolume(vol, request.GetTargetPath(), options) + if err != nil { + return nil, err + } + + klog.Infof("node.NodePublishVolume: Successfully published volume %s -> %s", request.GetVolumeId(), request.GetTargetPath()) + return &csi.NodePublishVolumeResponse{}, nil } // NodeUnpublishVolume unmount the volume from the target path @@ -119,10 +129,18 @@ func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.Node return nil, status.Error(codes.InvalidArgument, "Target path not provided") } - // TODO modify your volume umount logic here + vol, err := n.p5x.GetVolumeByName(request.GetVolumeId()) + if err != nil { + return nil, err + } - return nil, status.Error(codes.Unimplemented, "node.NodeUnpublishVolume") - // return &csi.NodeUnpublishVolumeResponse{}, nil + _, err = n.p5x.UnmountVolume(vol) + if err != nil { + return nil, err + } + + klog.Infof("node.NodeUnpublishVolume: Successfully unpublished volume %s -> %s", request.GetVolumeId(), request.GetTargetPath()) + return &csi.NodeUnpublishVolumeResponse{}, nil } // NodeGetVolumeStats get the volume stats diff --git a/pkg/csi/p5x.go b/pkg/csi/p5x.go index 1d129d3..9c8b6fd 100644 --- a/pkg/csi/p5x.go +++ b/pkg/csi/p5x.go @@ -75,6 +75,76 @@ func (p5x *p5xApi) DeleteVolume(volume *p5xVolume) error { return nil } +func (p5x *p5xApi) TransferVolume(volume *p5xVolume, nodeName string) (*p5xVolume, error) { + klog.Info("p5x.TransferVolume: ", volume, " | to node: ", nodeName) + route := fmt.Sprintf("volumes/%s/transfer-to/%s", volume.Name, nodeName) + resBody, err := p5x.MakeRequest(http.MethodPost, route, []byte(`{}`)) + if err != nil { + return nil, err + } + + vol := &p5xVolume{} + err = json.Unmarshal(resBody, vol) + if err != nil { + return nil, err + } + + klog.Info("p5x.TransferVolume: Successfully transferred volume:", volume.Name, vol) + return vol, nil +} + +type MountVolumeRequest struct { + Mountpoint string `json:"mountpoint"` + Options map[string]string `json:"options"` +} + +func (p5x *p5xApi) MountVolume(volume *p5xVolume, mountpoint string, options map[string]string) (*p5xVolume, error) { + klog.Info("p5x.MountVolume: ", volume, mountpoint, options) + req := &MountVolumeRequest{ + Mountpoint: mountpoint, + Options: options, + } + + body, err := json.Marshal(req) + if err != nil { + return nil, err + } + + route := fmt.Sprintf("volumes/%s/mount", volume.Name) + resBody, err := p5x.MakeRequest(http.MethodPost, route, body) + if err != nil { + return nil, err + } + + vol := &p5xVolume{} + err = json.Unmarshal(resBody, vol) + if err != nil { + return nil, err + } + + klog.Info("p5x.MountVolume: Successfully mounted volume: ", volume.Name, mountpoint) + return vol, nil +} + +func (p5x *p5xApi) UnmountVolume(volume *p5xVolume) (*p5xVolume, error) { + klog.Info("p5x.UnmountVolume: ", volume) + + route := fmt.Sprintf("volumes/%s/unmount", volume.Name) + resBody, err := p5x.MakeRequest(http.MethodPost, route, []byte(`{}`)) + if err != nil { + return nil, err + } + + vol := &p5xVolume{} + err = json.Unmarshal(resBody, vol) + if err != nil { + return nil, err + } + + klog.Info("p5x.UnmountVolume: Successfully unmounted volume: ", volume.Name) + return vol, nil +} + func (p5x *p5xApi) MakeRequest(method string, route string, body []byte) ([]byte, error) { url := fmt.Sprintf("%s:%d/api/v1/%s", p5x.endpoint, p5x.port, route) klog.Infof("p5x.MakeRequest: [%s] %s", method, url)