From 06b3caa923ca1549d531afdffe569a8b69b5dce4 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 8 Sep 2015 10:09:28 +0100 Subject: [PATCH] Add support for multiple executor backends. Create Executor interface and allow executors to register themselves with the application. Update README (Todo list). --- common/executors.go | 40 ++++++ executors/abstract.go | 47 +++++++ executors/kubernetes/kubernetes_executor.go | 133 ++++++++++++++++++++ job/job.go | 73 ----------- main.go | 76 ++++++----- 5 files changed, 264 insertions(+), 105 deletions(-) create mode 100644 common/executors.go create mode 100644 executors/abstract.go create mode 100644 executors/kubernetes/kubernetes_executor.go delete mode 100644 job/job.go diff --git a/common/executors.go b/common/executors.go new file mode 100644 index 0000000..da9edd6 --- /dev/null +++ b/common/executors.go @@ -0,0 +1,40 @@ +package common + +import ( + "errors" + "fmt" + + "github.com/munnerz/plex-elastic-transcoder/executors" + + log "github.com/Sirupsen/logrus" +) + +type ExecutorFactory struct { + Create func(executors.Job) executors.Executor +} + +var executorFactories map[string]ExecutorFactory + +func CreateExecutor(j executors.Job) executors.Executor { + // TODO: Some sort of executor selection algorithm + for _, e := range executorFactories { + // TODO: Here, run the create command defined in the factory + return e.Create(j) + // Hacky way to dispatch to the first executor + } + panic("No executors registered!") +} + +func RegisterExecutor(name string, e ExecutorFactory) error { + if executorFactories == nil { + executorFactories = make(map[string]ExecutorFactory) + } + + if _, ok := executorFactories[name]; ok { + return errors.New(fmt.Sprintf("Executor already registered: %s", name)) + } + + log.Print("Registered executor: ", name) + executorFactories[name] = e + return nil +} \ No newline at end of file diff --git a/executors/abstract.go b/executors/abstract.go new file mode 100644 index 0000000..b2fc743 --- /dev/null +++ b/executors/abstract.go @@ -0,0 +1,47 @@ +package executors + +import ( + "fmt" +) + +type Job struct { + Command []string + Args []string +} + +type Executor interface { + Start() error + Stop() error + WaitForState(ExecutorPhase) error + String() string +} + +type AbstractExecutor struct { + Job Job +} + +type ExecutorPhase string + +const ( + ExecutorPreparing ExecutorPhase = "Preparing" + ExecutorRunning ExecutorPhase = "Running" + ExecutorSucceeded ExecutorPhase = "Succeeded" + ExecutorFailed ExecutorPhase = "Failed" + ExecutorUnknown ExecutorPhase = "Unknown" +) + +func (e *AbstractExecutor) Start() error { + return nil +} + +func (e *AbstractExecutor) Stop() error { + return nil +} + +func (e *AbstractExecutor) WaitForState(p ExecutorPhase) error { + return nil +} + +func (e *AbstractExecutor) String() string { + return fmt.Sprintf("%s %s", e.Job.Command, e.Job.Args) +} \ No newline at end of file diff --git a/executors/kubernetes/kubernetes_executor.go b/executors/kubernetes/kubernetes_executor.go new file mode 100644 index 0000000..a0180b3 --- /dev/null +++ b/executors/kubernetes/kubernetes_executor.go @@ -0,0 +1,133 @@ +package kubernetes + +import ( + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client" + + log "github.com/Sirupsen/logrus" + + "github.com/munnerz/plex-elastic-transcoder/executors" + "github.com/munnerz/plex-elastic-transcoder/common" +) + +const podBasename = "plex-transcoder" +const kubernetesHost = "10.20.40.254:8080" +const kubernetesNamespace = "plex" +const dockerImage = "registry.marley.xyz/e720/plex-new-transcoder" + +type KubernetesExecutor struct { + executors.AbstractExecutor + + Host string + Namespace string + Image string + + pod *api.Pod + client *client.Client +} + +func (e *KubernetesExecutor) createPod() *api.Pod { + return &api.Pod{ + TypeMeta: api.TypeMeta{ + Kind: "Pod", + }, + ObjectMeta: api.ObjectMeta{ + GenerateName: podBasename, + Namespace: e.Namespace, + }, + Spec: api.PodSpec{ + RestartPolicy: api.RestartPolicyNever, + Containers: []api.Container{ + api.Container{ + Name: podBasename, + Image: e.Image, + Command: e.Job.Command, + Args: e.Job.Args, + }, + }, + }, + } +} + +func (e *KubernetesExecutor) Start() error { + log.Print("Executing job: ", e) + + e.pod = e.createPod() + + client, err := client.New( + &client.Config{ + Host: e.Host, + }) + + if err != nil { + return err + } + + e.client = client + + log.Print("Creating pod...") + pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Create(e.pod) + + if err != nil { + return err + } + + e.pod = pod + + log.Print("Successfully scheduled job on cluster with name: ", e.pod.ObjectMeta.Name) + + return nil +} + +func (e *KubernetesExecutor) Stop() error { + return e.client.Pods(e.pod.ObjectMeta.Namespace).Delete(e.pod.ObjectMeta.Name, nil) +} + +func podPhaseToExecutorPhase(in api.PodPhase) executors.ExecutorPhase { + switch in { + case api.PodRunning: + return executors.ExecutorRunning + case api.PodPending: + return executors.ExecutorPreparing + case api.PodSucceeded: + return executors.ExecutorSucceeded + case api.PodFailed: + return executors.ExecutorFailed + default: + return executors.ExecutorUnknown + } +} + +func (e *KubernetesExecutor) WaitForState(targetState executors.ExecutorPhase) error { + Loop: + for { + pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Get(e.pod.ObjectMeta.Name) + if err != nil { + return err + } + + Switch: + switch podPhaseToExecutorPhase(pod.Status.Phase) { + case targetState: + break Loop + default: + break Switch + } + time.Sleep(1 * time.Second) + } + return nil +} + +func init() { + common.RegisterExecutor("kubernetes", common.ExecutorFactory{ + Create: func(j executors.Job) executors.Executor { + return &KubernetesExecutor{ + Host: kubernetesHost, + Namespace: kubernetesNamespace, + Image: dockerImage, + } + }, + }) +} \ No newline at end of file diff --git a/job/job.go b/job/job.go deleted file mode 100644 index 99dd296..0000000 --- a/job/job.go +++ /dev/null @@ -1,73 +0,0 @@ -package job - -import ( - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client" - - log "github.com/Sirupsen/logrus" -) - -type Job struct { - Host string - Pod *api.Pod - - client *client.Client -} - -func (t Job) String() string { - return t.Pod.ObjectMeta.Name -} - -func (t Job) Start() error { - log.Print("Executing job: ", t) - - client, err := client.New( - &client.Config{ - Host: t.Host, - }) - - if err != nil { - return err - } - - t.client = client - - log.Print("Creating pod...") - pod, err := t.client.Pods(t.Pod.ObjectMeta.Namespace).Create(t.Pod) - - if err != nil { - return err - } - - t.Pod = pod - - log.Print("Successfully scheduled job on cluster with name: ", pod.ObjectMeta.Name) - - return nil -} - -func (t Job) Stop() error { - return t.client.Pods(t.Pod.ObjectMeta.Namespace).Delete(t.Pod.ObjectMeta.Name, nil) -} - -func (t Job) WaitForState(targetState api.PodPhase) error { - Loop: - for { - pod, err := t.client.Pods(t.Pod.ObjectMeta.Namespace).Get(t.Pod.ObjectMeta.Name) - if err != nil { - return err - } - - Switch: - switch pod.Status.Phase { - case targetState: - break Loop - default: - break Switch - } - time.Sleep(1 * time.Second) - } - return nil -} \ No newline at end of file diff --git a/main.go b/main.go index 246ebb6..f00910a 100644 --- a/main.go +++ b/main.go @@ -2,56 +2,68 @@ package main import ( "os" + "os/signal" + "syscall" log "github.com/Sirupsen/logrus" - "k8s.io/kubernetes/pkg/api" + "github.com/munnerz/plex-elastic-transcoder/common" + "github.com/munnerz/plex-elastic-transcoder/executors" - "github.com/munnerz/plex-elastic-transcoder/job" + _ "github.com/munnerz/plex-elastic-transcoder/executors/kubernetes" ) -const kubernetesHost = "http://10.20.40.254:8080/" -const kubernetesNamespace = "default" -const dockerImage = "registry.marley.xyz/e720/plex-new-transcoder" -const podBasename = "plex-transcoder" +var executor executors.Executor +func signals() { + // Signal handling + c := make(chan os.Signal, 1) + + signal.Notify(c, os.Interrupt) + signal.Notify(c, syscall.SIGTERM) + go func() { + <-c + log.Print("Shutting down...") + + if executor != nil { + log.Print("Terminating running job...") + err := executor.Stop() + if err != nil { + log.Fatal("Error terminating job: ", err) + os.Exit(1) + } + + log.Print("Successfully terminated running job.") + } + + os.Exit(0) + }() +} func main() { + // Setup signals + signals() + // Get the arguments passed to Plex New Transcoder args := os.Args[1:] log.Print("Dispatching job with args: ", args) - job := job.Job{ - Host: kubernetesHost, - Pod: - &api.Pod{ - TypeMeta: api.TypeMeta{ - Kind: "Pod", - }, - ObjectMeta: api.ObjectMeta{ - GenerateName: podBasename, - Namespace: kubernetesNamespace, - }, - Spec: api.PodSpec{ - RestartPolicy: api.RestartPolicyNever, - Containers: []api.Container{ - api.Container{ - Name: podBasename, - Image: dockerImage, - Args: args, - }, - }, - }, - }, + job := executors.Job{ + Command: []string{"/Plex New Transcoder"}, + Args: args, } - err := job.Start() + executor = common.CreateExecutor(job) + + log.Print("Created executor: ", executor) + + err := executor.Start() if err != nil { log.Fatal("Job start failed with error: ", err) } log.Print("Waiting for build pod to enter Running state...") - err = job.WaitForState(api.PodRunning) + err = executor.WaitForState(executors.ExecutorRunning) if err != nil { log.Fatal("Error waiting for pod to enter running state: ", err) } @@ -59,14 +71,14 @@ func main() { log.Print("Job has started running...") log.Print("Waiting for job to complete...") - err = job.WaitForState(api.PodSucceeded) + err = executor.WaitForState(executors.ExecutorSucceeded) if err != nil { log.Fatal("Error waiting for job to complete: ", err) } log.Print("Job completed. Destroying pod...") - err = job.Stop() + err = executor.Stop() if err != nil { log.Fatal("Error stopping job: ", err) }