diff --git a/bootstrap.sh b/bootstrap.sh index 8576ea4..5e9b604 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -1,7 +1,5 @@ #!/bin/bash -mkdir -p "$1" - cd "$1" echo "CD'd to directory: $1" diff --git a/common/config.go b/common/config.go new file mode 100644 index 0000000..58360db --- /dev/null +++ b/common/config.go @@ -0,0 +1,29 @@ +package common + +import ( + "k8s.io/kubernetes/pkg/api" +) + +type Config struct { + LogFile string `group:"config" namespace:"config"` + Plex *PlexConfig `group:"plex config" namespace:"plex"` + + Kubernetes *KubernetesConfig `group:"kubernetes executor" namespace:"kubernetes"` +} + +type PlexConfig struct { + URL string + + TranscodeDir string `yaml:"transcodeDir"` + MediaDir string `yaml:"mediaDir"` +} + +type KubernetesConfig struct { + ProxyURL string `yaml:"proxyUrl"` + Namespace string + PodBasename string `yaml:"podBasename"` + Image string + + TranscodeVolumeSource api.VolumeSource `yaml:"transcodeVolumeSource"` + MediaVolumeSource api.VolumeSource `yaml:"mediaVolumeSource"` +} diff --git a/common/executors.go b/common/executors.go index da9edd6..4a759d6 100644 --- a/common/executors.go +++ b/common/executors.go @@ -1,40 +1,52 @@ package common import ( - "errors" "fmt" - "github.com/munnerz/plex-elastic-transcoder/executors" - log "github.com/Sirupsen/logrus" ) -type ExecutorFactory struct { - Create func(executors.Job) executors.Executor +type ExecutorPhase string + +const ( + ExecutorPhasePreparing ExecutorPhase = "Preparing" + ExecutorPhaseRunning ExecutorPhase = "Running" + ExecutorPhaseSucceeded ExecutorPhase = "Succeeded" + ExecutorPhaseFailed ExecutorPhase = "Failed" + ExecutorPhaseUnknown ExecutorPhase = "Unknown" +) + +type Executor interface { + Start() error + Stop() error + WaitForState(ExecutorPhase) error + String() string } -var executorFactories map[string]ExecutorFactory +type Job struct { + Args []string +} -func CreateExecutor(j executors.Job) executors.Executor { - // TODO: Some sort of executor selection algorithm +type ExecutorFactory struct { + Create func(Config, Job) Executor +} + +var executorFactories = make(map[string]ExecutorFactory) + +func CreateExecutor(config Config, j Job) (Executor, error) { + // TODO: Select which executor to use based on config for _, e := range executorFactories { - // TODO: Here, run the create command defined in the factory - return e.Create(j) - // Hacky way to dispatch to the first executor + return e.Create(config, j), nil } - panic("No executors registered!") + return nil, fmt.Errorf("no configured executor found") } func RegisterExecutor(name string, e ExecutorFactory) error { - if executorFactories == nil { - executorFactories = make(map[string]ExecutorFactory) - } - if _, ok := executorFactories[name]; ok { - return errors.New(fmt.Sprintf("Executor already registered: %s", name)) + return fmt.Errorf(fmt.Sprintf("executor already registered: %s", name)) } - log.Print("Registered executor: ", name) + log.Infof("Registered executor: %s", name) executorFactories[name] = e return nil -} \ No newline at end of file +} diff --git a/executors/abstract.go b/executors/abstract.go index b2fc743..50c5695 100644 --- a/executors/abstract.go +++ b/executors/abstract.go @@ -2,34 +2,15 @@ package executors import ( "fmt" + + "github.com/munnerz/plex-elastic-transcoder/common" ) -type Job struct { - Command []string - Args []string -} - -type Executor interface { - Start() error - Stop() error - WaitForState(ExecutorPhase) error - String() string -} - type AbstractExecutor struct { - Job Job + Config common.Config + Job common.Job } -type ExecutorPhase string - -const ( - ExecutorPreparing ExecutorPhase = "Preparing" - ExecutorRunning ExecutorPhase = "Running" - ExecutorSucceeded ExecutorPhase = "Succeeded" - ExecutorFailed ExecutorPhase = "Failed" - ExecutorUnknown ExecutorPhase = "Unknown" -) - func (e *AbstractExecutor) Start() error { return nil } @@ -38,10 +19,10 @@ func (e *AbstractExecutor) Stop() error { return nil } -func (e *AbstractExecutor) WaitForState(p ExecutorPhase) error { +func (e *AbstractExecutor) WaitForState(p common.ExecutorPhase) error { return nil } func (e *AbstractExecutor) String() string { - return fmt.Sprintf("%s %s", e.Job.Command, e.Job.Args) + return fmt.Sprintf("%s", e.Job.Args) } \ No newline at end of file diff --git a/executors/kubernetes/kubernetes_executor.go b/executors/kubernetes/kubernetes_executor.go index 465dac5..4f5145e 100644 --- a/executors/kubernetes/kubernetes_executor.go +++ b/executors/kubernetes/kubernetes_executor.go @@ -1,82 +1,60 @@ package kubernetes import ( - "time" - "errors" "fmt" - - "k8s.io/kubernetes/pkg/api" - client "k8s.io/kubernetes/pkg/client/unversioned" + "time" log "github.com/Sirupsen/logrus" + "k8s.io/kubernetes/pkg/api" + stableApi "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/restclient" + client "k8s.io/kubernetes/pkg/client/unversioned" - "github.com/munnerz/plex-elastic-transcoder/executors" "github.com/munnerz/plex-elastic-transcoder/common" + "github.com/munnerz/plex-elastic-transcoder/executors" ) -const podBasename = "plex-transcoder" -const kubernetesHost = "10.20.40.254:8080" -const kubernetesNamespace = "plex" -const dockerImage = "registry.marley.xyz/munnerz/plex-new-transcoder" - type KubernetesExecutor struct { executors.AbstractExecutor - Host string - Namespace string - Image string - - pod *api.Pod - client *client.Client + pod *api.Pod + client *client.Client } func (e *KubernetesExecutor) createPod() *api.Pod { - fmt.Println("Args: ", e.Job.Args) return &api.Pod{ - TypeMeta: api.TypeMeta{ + TypeMeta: stableApi.TypeMeta{ Kind: "Pod", }, ObjectMeta: api.ObjectMeta{ - GenerateName: podBasename, - Namespace: e.Namespace, + GenerateName: fmt.Sprintf("%s-", e.Config.Kubernetes.PodBasename), + Namespace: e.Config.Kubernetes.Namespace, }, Spec: api.PodSpec{ Volumes: []api.Volume{ api.Volume{ - Name: "source-dir", - VolumeSource: api.VolumeSource { - NFS: &api.NFSVolumeSource { - Server: "10.12.14.16", - Path: "/tank/media", - ReadOnly: true, - }, - }, + Name: "source-dir", + VolumeSource: e.Config.Kubernetes.MediaVolumeSource, }, api.Volume{ - Name: "transcode-dir", - VolumeSource: api.VolumeSource { - NFS: &api.NFSVolumeSource { - Server: "storage.kube.marley.xyz", - Path: "/shares/containers/plex/transcode", - }, - }, + Name: "transcode-dir", + VolumeSource: e.Config.Kubernetes.TranscodeVolumeSource, }, }, RestartPolicy: api.RestartPolicyNever, Containers: []api.Container{ api.Container{ - Name: podBasename, - Image: e.Image, - Command: e.Job.Command, - Args: e.Job.Args, + Name: "plex-new-transcoder", + Image: e.Config.Kubernetes.Image, + Args: e.Job.Args, VolumeMounts: []api.VolumeMount{ api.VolumeMount{ - Name: "source-dir", - MountPath: "/data", + Name: "source-dir", + MountPath: e.Config.Plex.MediaDir, }, api.VolumeMount{ - Name: "transcode-dir", - MountPath: "/tmp", + Name: "transcode-dir", + MountPath: e.Config.Plex.TranscodeDir, }, }, ImagePullPolicy: api.PullAlways, @@ -87,31 +65,30 @@ func (e *KubernetesExecutor) createPod() *api.Pod { } func (e *KubernetesExecutor) Start() error { - log.Print("Executing job: ", e) + log.Debugf("executing job: %s", e.Job) - e.pod = e.createPod() - - client, err := client.New( - &client.Config{ - Host: e.Host, + if len(e.Config.Kubernetes.ProxyURL) > 0 { + e.client = client.NewOrDie(&restclient.Config{ + Host: e.Config.Kubernetes.ProxyURL, }) + } else { + var err error + e.client, err = client.NewInCluster() + if err != nil { + return fmt.Errorf("failed to create kubernetes client: %s", err.Error()) + } + } + + pod := e.createPod() + + var err error + e.pod, err = e.client.Pods(pod.ObjectMeta.Namespace).Create(pod) if err != nil { return err } - e.client = client - - log.Print("Creating pod...") - pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Create(e.pod) - - if err != nil { - return err - } - - e.pod = pod - - log.Print("Successfully scheduled job on cluster with name: ", e.pod.ObjectMeta.Name) + log.Debugf("successfully scheduled job on cluster with name: %s", e.pod.ObjectMeta.Name) return nil } @@ -120,35 +97,37 @@ func (e *KubernetesExecutor) Stop() error { return e.client.Pods(e.pod.ObjectMeta.Namespace).Delete(e.pod.ObjectMeta.Name, nil) } -func podPhaseToExecutorPhase(in api.PodPhase) executors.ExecutorPhase { +func podPhaseToExecutorPhase(in api.PodPhase) common.ExecutorPhase { switch in { case api.PodRunning: - return executors.ExecutorRunning + return common.ExecutorPhaseRunning case api.PodPending: - return executors.ExecutorPreparing + return common.ExecutorPhasePreparing case api.PodSucceeded: - return executors.ExecutorSucceeded + return common.ExecutorPhaseSucceeded case api.PodFailed: - return executors.ExecutorFailed + return common.ExecutorPhaseFailed default: - return executors.ExecutorUnknown + return common.ExecutorPhaseUnknown } } -func (e *KubernetesExecutor) WaitForState(targetState executors.ExecutorPhase) error { - Loop: +func (e *KubernetesExecutor) WaitForState(targetState common.ExecutorPhase) error { +Loop: for { pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Get(e.pod.ObjectMeta.Name) if err != nil { return err } - Switch: + log.Debugf("pod state: %s", pod.Status.Phase) + + Switch: switch podPhaseToExecutorPhase(pod.Status.Phase) { case targetState: break Loop - case executors.ExecutorFailed: - return errors.New(fmt.Sprintf("Pod failed whilst waiting for state: %s\nReason: %s", targetState, pod.Status.Reason)) + case common.ExecutorPhaseFailed: + return fmt.Errorf("pod failed whilst waiting for state '%s': %s", targetState, pod.Status.Reason) default: break Switch } @@ -159,14 +138,13 @@ func (e *KubernetesExecutor) WaitForState(targetState executors.ExecutorPhase) e func init() { common.RegisterExecutor("kubernetes", common.ExecutorFactory{ - Create: func(j executors.Job) executors.Executor { - e := &KubernetesExecutor{ - Host: kubernetesHost, - Namespace: kubernetesNamespace, - Image: dockerImage, + Create: func(config common.Config, job common.Job) common.Executor { + return &KubernetesExecutor{ + AbstractExecutor: executors.AbstractExecutor{ + Config: config, + Job: job, + }, } - e.Job = j - return e }, }) -} \ No newline at end of file +} diff --git a/main.go b/main.go index 65f024b..c114219 100644 --- a/main.go +++ b/main.go @@ -1,27 +1,23 @@ package main import ( + "io/ioutil" "os" "os/signal" - "syscall" - "fmt" "strings" + "syscall" log "github.com/Sirupsen/logrus" + "github.com/go-yaml/yaml" "github.com/munnerz/plex-elastic-transcoder/common" - "github.com/munnerz/plex-elastic-transcoder/executors" _ "github.com/munnerz/plex-elastic-transcoder/executors/kubernetes" ) -const ( - cmdPath = "/plexmediaserver/bootstrap.sh" - logFilePath = "/var/log/plex/plex-elastic-transcoder.log" - plexServerURL = "10.20.40.60:32400" -) +const configFile = "/etc/plex-elastic-transcoder/config.yaml" -var executor executors.Executor +var executor common.Executor func signals() { // Signal handling @@ -31,91 +27,124 @@ func signals() { signal.Notify(c, syscall.SIGTERM) go func() { <-c - log.Print("Shutting down...") + log.Infof("shutting down...") if executor != nil { - log.Print("Terminating running job...") + log.Infof("terminating running job...") err := executor.Stop() if err != nil { - log.Fatal("Error terminating job: ", err) + log.Fatalf("error terminating job: %s", err.Error()) os.Exit(1) } - log.Print("Successfully terminated running job.") + log.Infof("successfully terminated running job") } os.Exit(0) }() } +func loadConfig() (*common.Config, error) { + data, err := ioutil.ReadFile(configFile) + + if err != nil { + return nil, err + } + + config := new(common.Config) + + err = yaml.Unmarshal(data, config) + + return config, err +} + func main() { + log.SetLevel(log.DebugLevel) + config, err := loadConfig() + + if err != nil { + log.Fatalf("error loading config: %s", err.Error()) + } + + log.Debugf("loaded config: %s", config) + // Setup signals signals() // Setup logs - fo, err := os.Create(logFilePath) + if len(config.LogFile) > 0 { + fo, err := os.Create(config.LogFile) - if err != nil { - panic(fmt.Sprintf("Error opening log file: %s", err)) - } - - defer func() { - if err := fo.Close(); err != nil { - panic(fmt.Sprintf("Error closing file: %s", err)) + if err != nil { + log.Fatalf("error opening log file: %s", err) } - }() - log.SetOutput(fo) + log.SetOutput(fo) + defer func() { + if err := fo.Close(); err != nil { + log.Fatalf("error closing file: %s", err) + } + }() + } // Get the arguments passed to Plex New Transcoder args := os.Args[1:] + log.Debugf("executing job with args: %s", args) wd, _ := os.Getwd() for i, arg := range args { - if arg == "-progressurl" { - // Change the progress URL to report to about the transcode - args[i + 1] = strings.Replace(args[i+1], "127.0.0.1:32400", plexServerURL, 1) + switch arg { + case "-progressurl": + log.Debugf("replacing progressURL with: %s", config.Plex.URL) + args[i+1] = strings.Replace(args[i+1], "127.0.0.1:32400", config.Plex.URL, 1) + break + case "-loglevel": + case "loglevel_plex": + args[i+1] = "debug" } } args = append([]string{wd}, args...) - log.Print("In WD: ", wd) - log.Print("Dispatching job with args: ", args) + log.Debugf("current working directory: %s", wd) - job := executors.Job{ - Command: []string{cmdPath}, + job := common.Job{ Args: args, } - executor = common.CreateExecutor(job) + executor, err = common.CreateExecutor(*config, job) - log.Print("Created executor: ", executor) - - err = executor.Start() if err != nil { - log.Fatal("Job start failed with error: ", err) + log.Fatalf("error creating executor: %s", err.Error()) } - log.Print("Waiting for build pod to enter Running state...") + log.Infof("created executor: ", executor) - err = executor.WaitForState(executors.ExecutorRunning) + err = executor.Start() + + if err != nil { + log.Fatalf("failed to start job: %s", err.Error()) + } + + log.Print("waiting for build pod to enter Running state...") + + err = executor.WaitForState(common.ExecutorPhaseRunning) if err != nil { log.Fatal("Error waiting for pod to enter running state: ", err) } - log.Print("Job has started running...") - log.Print("Waiting for job to complete...") + log.Infof("job has started running...") + log.Infof("waiting for job to complete...") - err = executor.WaitForState(executors.ExecutorSucceeded) + err = executor.WaitForState(common.ExecutorPhaseSucceeded) if err != nil { - log.Fatal("Error waiting for job to complete: ", err) + log.Fatalf("error waiting for job to complete: %s", err.Error()) } - log.Print("Job completed. Destroying pod...") + log.Infof("job completed. cleaning up...") err = executor.Stop() if err != nil { - log.Fatal("Error stopping job: ", err) + log.Fatalf("error stopping job: %s", err.Error()) } - log.Print("Pod destroyed. Exiting.") + log.Printf("cleaned up successfully") }