[WIP] Start very early implementation

This commit is contained in:
2024-09-28 01:44:56 -04:00
commit c98b421b03
20 changed files with 1172 additions and 0 deletions

167
pkg/csi/controller.go Normal file
View File

@@ -0,0 +1,167 @@
/*
Copyright 2024 p5x.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
controllerCaps = []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
}
)
type controllerService struct {
p5x *p5xApi
csi.UnimplementedControllerServer
}
var _ csi.ControllerServer = &controllerService{}
func newControllerService(p5x *p5xApi) controllerService {
return controllerService{
p5x: p5x,
}
}
// CreateVolume creates a volume
func (d *controllerService) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if len(request.Name) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
}
if request.VolumeCapabilities == nil {
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}
requiredCap := request.CapacityRange.GetRequiredBytes()
vol, err := d.p5x.CreateVolume(request.Name, requiredCap)
if err != nil {
return nil, err
}
csiVol := csi.Volume{
VolumeId: vol.Name,
CapacityBytes: vol.SizeInBytes,
}
return &csi.CreateVolumeResponse{Volume: &csiVol}, nil
// volCtx := make(map[string]string)
// for k, v := range request.Parameters {
// volCtx[k] = v
// }
//
// volCtx["subPath"] = request.Name
//
// volume := csi.Volume{
// VolumeId: request.Name,
// CapacityBytes: requiredCap,
// VolumeContext: volCtx,
// }
}
// DeleteVolume deletes a volume
func (d *controllerService) DeleteVolume(ctx context.Context, request *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
vol, err := d.p5x.GetVolumeByName(request.VolumeId)
if err != nil {
return nil, err
}
err = d.p5x.DeleteVolume(vol)
if err != nil {
return nil, err
}
return &csi.DeleteVolumeResponse{}, nil
}
// ControllerGetCapabilities get controller capabilities
func (d *controllerService) ControllerGetCapabilities(ctx context.Context, request *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
var caps []*csi.ControllerServiceCapability
for _, cap := range controllerCaps {
c := &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
caps = append(caps, c)
}
return &csi.ControllerGetCapabilitiesResponse{Capabilities: caps}, nil
}
// ControllerPublishVolume publish a volume
func (d *controllerService) ControllerPublishVolume(ctx context.Context, request *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerUnpublishVolume unpublish a volume
func (d *controllerService) ControllerUnpublishVolume(ctx context.Context, request *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ValidateVolumeCapabilities validate volume capabilities
func (d *controllerService) ValidateVolumeCapabilities(ctx context.Context, request *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ListVolumes list volumes
func (d *controllerService) ListVolumes(ctx context.Context, request *csi.ListVolumesRequest) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// GetCapacity get capacity
func (d *controllerService) GetCapacity(ctx context.Context, request *csi.GetCapacityRequest) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// CreateSnapshot create a snapshot
func (d *controllerService) CreateSnapshot(ctx context.Context, request *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// DeleteSnapshot delete a snapshot
func (d *controllerService) DeleteSnapshot(ctx context.Context, request *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ListSnapshots list snapshots
func (d *controllerService) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerExpandVolume expand a volume
func (d *controllerService) ControllerExpandVolume(ctx context.Context, request *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerGetVolume get a volume
func (d *controllerService) ControllerGetVolume(ctx context.Context, request *csi.ControllerGetVolumeRequest) (*csi.ControllerGetVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerModifyVolume modify a volume
func (d *controllerService) ControllerModifyVolume(ctx context.Context, request *csi.ControllerModifyVolumeRequest) (*csi.ControllerModifyVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

121
pkg/csi/driver.go Normal file
View File

@@ -0,0 +1,121 @@
/*
Copyright 2024 p5x.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"fmt"
"net"
"net/url"
"os"
"path"
"path/filepath"
"strings"
csi "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"k8s.io/klog"
)
const (
// DriverName to be registered
DriverName = "p5x"
)
type Driver struct {
controllerService
nodeService
srv *grpc.Server
endpoint string
P5x *p5xApi
csi.UnimplementedIdentityServer
}
// NewDriver creates a new driver
func NewDriver(endpoint string, nodeID string, p5xEndpoint string, p5xToken string, p5xPort int64) *Driver {
klog.Infof("Driver: %v version %v commit %v date %v", DriverName, driverVersion, gitCommit, buildDate)
p5x := &p5xApi{
endpoint: p5xEndpoint,
token: p5xToken,
port: p5xPort,
}
return &Driver{
endpoint: endpoint,
controllerService: newControllerService(p5x),
nodeService: newNodeService(nodeID),
P5x: p5x,
}
}
func (d *Driver) Run() error {
scheme, addr, err := ParseEndpoint(d.endpoint)
if err != nil {
return err
}
listener, err := net.Listen(scheme, addr)
if err != nil {
return err
}
logErr := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
resp, err := handler(ctx, req)
if err != nil {
klog.Errorf("GRPC error: %v", err)
}
return resp, err
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logErr),
}
d.srv = grpc.NewServer(opts...)
csi.RegisterIdentityServer(d.srv, d)
csi.RegisterControllerServer(d.srv, d)
csi.RegisterNodeServer(d.srv, d)
klog.Infof("Listening for connection on address: %#v", listener.Addr())
return d.srv.Serve(listener)
}
func ParseEndpoint(endpoint string) (string, string, error) {
u, err := url.Parse(endpoint)
if err != nil {
return "", "", fmt.Errorf("could not parse endpoint: %v", err)
}
addr := path.Join(u.Host, filepath.FromSlash(u.Path))
scheme := strings.ToLower(u.Scheme)
switch scheme {
case "tcp":
case "unix":
addr = path.Join("/", addr)
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
return "", "", fmt.Errorf("could not remove unix domain socket %q: %v", addr, err)
}
default:
return "", "", fmt.Errorf("unsupported protocol: %s", scheme)
}
return scheme, addr, nil
}

53
pkg/csi/identity.go Normal file
View File

@@ -0,0 +1,53 @@
/*
Copyright 2024 p5x.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// GetPluginInfo returns the name and version of the plugin
func (d *Driver) GetPluginInfo(ctx context.Context, request *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) {
resp := &csi.GetPluginInfoResponse{
Name: DriverName,
VendorVersion: "v1",
}
return resp, nil
}
// GetPluginCapabilities returns the capabilities of the plugin
func (d *Driver) GetPluginCapabilities(ctx context.Context, request *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
resp := &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}
return resp, nil
}
// Probe returns the health and readiness of the plugin
func (d *Driver) Probe(ctx context.Context, request *csi.ProbeRequest) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}

168
pkg/csi/node.go Normal file
View File

@@ -0,0 +1,168 @@
/*
Copyright 2024 p5x.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"context"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
volumeCaps = []csi.VolumeCapability_AccessMode{
{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
},
}
)
type nodeService struct {
nodeID string
csi.UnimplementedNodeServer
}
var _ csi.NodeServer = &nodeService{}
func newNodeService(nodeID string) nodeService {
return nodeService{
nodeID: nodeID,
}
}
// NodeStageVolume is called by the CO when a workload that wants to use the specified volume is placed (scheduled) on a node.
func (n *nodeService) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodeUnstageVolume is called by the CO when a workload that was using the specified volume is being moved to a different node.
func (n *nodeService) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodePublishVolume mounts the volume on the node.
func (n *nodeService) NodePublishVolume(ctx context.Context, request *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume id not provided")
}
target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
volCap := request.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "Volume capability not provided")
}
if !isValidVolumeCapabilities([]*csi.VolumeCapability{volCap}) {
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
}
readOnly := false
if request.GetReadonly() || request.VolumeCapability.AccessMode.GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
readOnly = true
}
options := make(map[string]string)
if m := volCap.GetMount(); m != nil {
for _, f := range m.MountFlags {
// get mountOptions from PV.spec.mountOptions
options[f] = ""
}
}
if readOnly {
// Todo add readonly in your mount options
}
// TODO modify your volume mount logic here
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unmount the volume from the target path
func (n *nodeService) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
}
// TODO modify your volume umount logic here
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetVolumeStats get the volume stats
func (n *nodeService) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVolumeStatsRequest) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodeExpandVolume expand the volume
func (n *nodeService) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// NodeGetCapabilities get the node capabilities
func (n *nodeService) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{}, nil
}
// NodeGetInfo get the node info
func (n *nodeService) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{NodeId: n.nodeID}, nil
}
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
hasSupport := func(cap *csi.VolumeCapability) bool {
if csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER == cap.AccessMode.GetMode() {
return true
}
if csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER == cap.AccessMode.GetMode() {
return true
}
if csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY == cap.AccessMode.GetMode() {
return true
}
// for _, c := range volumeCaps {
// if c.GetMode() == cap.AccessMode.GetMode() {
// return true
// }
// }
return false
}
foundAll := true
for _, c := range volCaps {
if !hasSupport(c) {
foundAll = false
}
}
return foundAll
}

97
pkg/csi/p5x.go Normal file
View File

@@ -0,0 +1,97 @@
package csi
import (
"bytes"
"encoding/json"
"fmt"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io"
"k8s.io/klog"
"net/http"
)
type p5xApi struct {
endpoint string
port int64
token string
}
type p5xVolume struct {
VolumeId int64 `json:"volumeId"`
Name string `json:"name"`
SizeInBytes int64 `json:"sizeInBytes"`
}
func (p5x *p5xApi) CreateVolume(name string, sizeInBytes int64) (*p5xVolume, error) {
vol := &p5xVolume{
VolumeId: 0,
Name: name,
SizeInBytes: sizeInBytes,
}
body, err := json.Marshal(vol)
if err != nil {
return nil, err
}
resBody, err := p5x.MakeRequest(http.MethodPost, "volumes", body)
if err != nil {
return nil, err
}
err = json.Unmarshal(resBody, vol)
klog.Info("Successfully created volume: ", vol)
return vol, nil
}
func (p5x *p5xApi) GetVolumeByName(name string) (*p5xVolume, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (p5x *p5xApi) GetVolumeById(volumeId int64) (*p5xVolume, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (p5x *p5xApi) DeleteVolume(volume *p5xVolume) error {
route := fmt.Sprintf("volumes/%s", volume.Name)
resBody, err := p5x.MakeRequest(http.MethodDelete, route, []byte(`{}`))
if err != nil {
return err
}
klog.Infof("Successfully deleted volume %s: %s", volume.Name, resBody)
return nil
}
func (p5x *p5xApi) MakeRequest(method string, route string, body []byte) ([]byte, error) {
bodyReader := bytes.NewReader(body)
url := fmt.Sprintf("%s:%d/api/v1/%s", p5x.endpoint, p5x.port, route)
klog.Infof("p5x.MakeRequest: [%s] %s", method, url)
klog.Infof("p5x.MakeRequest: %s", body)
req, err := http.NewRequest(method, url, bodyReader)
if err != nil {
klog.Errorf("p5x.MakeRequest: could not create request: %s\n", err)
return nil, err
}
req.Header.Set("Accept", "application/json")
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", p5x.token))
req.Header.Set("Content-Type", "application/json")
res, err := http.DefaultClient.Do(req)
if err != nil {
klog.Errorf("p5x.MakeRequest: error executing request: %s\n", err)
return nil, err
}
resBody, err := io.ReadAll(res.Body)
if err != nil {
klog.Errorf("p5x.MakeRequest: could not read response body: %s\n", err)
return nil, err
}
klog.Infof("p5x.MakeRequest: response body: %s\n", resBody)
return resBody, nil
}

62
pkg/csi/version.go Normal file
View File

@@ -0,0 +1,62 @@
/*
Copyright 2024 p5x.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package csi
import (
"encoding/json"
"fmt"
"runtime"
)
// These are set during build time via -ldflags
var (
driverVersion string
gitCommit string
buildDate string
)
// VersionInfo struct
type VersionInfo struct {
DriverVersion string
GitCommit string
BuildDate string
GoVersion string
Compiler string
Platform string
}
// GetVersion returns VersionInfo
func GetVersion() VersionInfo {
return VersionInfo{
DriverVersion: driverVersion,
GitCommit: gitCommit,
BuildDate: buildDate,
GoVersion: runtime.Version(),
Compiler: runtime.Compiler,
Platform: fmt.Sprintf("%s/%s", runtime.GOOS, runtime.GOARCH),
}
}
// GetVersionJSON returns version in JSON
func GetVersionJSON() (string, error) {
info := GetVersion()
marshalled, err := json.MarshalIndent(&info, "", " ")
if err != nil {
return "", err
}
return string(marshalled), nil
}