Add support for multiple executor backends. Create Executor interface and allow executors to register themselves with the application. Update README (Todo list).

This commit is contained in:
James Munnelly
2015-09-08 10:09:28 +01:00
parent 1c80f8d4a0
commit 06b3caa923
5 changed files with 264 additions and 105 deletions
+40
View File
@@ -0,0 +1,40 @@
package common
import (
"errors"
"fmt"
"github.com/munnerz/plex-elastic-transcoder/executors"
log "github.com/Sirupsen/logrus"
)
type ExecutorFactory struct {
Create func(executors.Job) executors.Executor
}
var executorFactories map[string]ExecutorFactory
func CreateExecutor(j executors.Job) executors.Executor {
// TODO: Some sort of executor selection algorithm
for _, e := range executorFactories {
// TODO: Here, run the create command defined in the factory
return e.Create(j)
// Hacky way to dispatch to the first executor
}
panic("No executors registered!")
}
func RegisterExecutor(name string, e ExecutorFactory) error {
if executorFactories == nil {
executorFactories = make(map[string]ExecutorFactory)
}
if _, ok := executorFactories[name]; ok {
return errors.New(fmt.Sprintf("Executor already registered: %s", name))
}
log.Print("Registered executor: ", name)
executorFactories[name] = e
return nil
}
+47
View File
@@ -0,0 +1,47 @@
package executors
import (
"fmt"
)
type Job struct {
Command []string
Args []string
}
type Executor interface {
Start() error
Stop() error
WaitForState(ExecutorPhase) error
String() string
}
type AbstractExecutor struct {
Job Job
}
type ExecutorPhase string
const (
ExecutorPreparing ExecutorPhase = "Preparing"
ExecutorRunning ExecutorPhase = "Running"
ExecutorSucceeded ExecutorPhase = "Succeeded"
ExecutorFailed ExecutorPhase = "Failed"
ExecutorUnknown ExecutorPhase = "Unknown"
)
func (e *AbstractExecutor) Start() error {
return nil
}
func (e *AbstractExecutor) Stop() error {
return nil
}
func (e *AbstractExecutor) WaitForState(p ExecutorPhase) error {
return nil
}
func (e *AbstractExecutor) String() string {
return fmt.Sprintf("%s %s", e.Job.Command, e.Job.Args)
}
+133
View File
@@ -0,0 +1,133 @@
package kubernetes
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client"
log "github.com/Sirupsen/logrus"
"github.com/munnerz/plex-elastic-transcoder/executors"
"github.com/munnerz/plex-elastic-transcoder/common"
)
const podBasename = "plex-transcoder"
const kubernetesHost = "10.20.40.254:8080"
const kubernetesNamespace = "plex"
const dockerImage = "registry.marley.xyz/e720/plex-new-transcoder"
type KubernetesExecutor struct {
executors.AbstractExecutor
Host string
Namespace string
Image string
pod *api.Pod
client *client.Client
}
func (e *KubernetesExecutor) createPod() *api.Pod {
return &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
},
ObjectMeta: api.ObjectMeta{
GenerateName: podBasename,
Namespace: e.Namespace,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{
api.Container{
Name: podBasename,
Image: e.Image,
Command: e.Job.Command,
Args: e.Job.Args,
},
},
},
}
}
func (e *KubernetesExecutor) Start() error {
log.Print("Executing job: ", e)
e.pod = e.createPod()
client, err := client.New(
&client.Config{
Host: e.Host,
})
if err != nil {
return err
}
e.client = client
log.Print("Creating pod...")
pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Create(e.pod)
if err != nil {
return err
}
e.pod = pod
log.Print("Successfully scheduled job on cluster with name: ", e.pod.ObjectMeta.Name)
return nil
}
func (e *KubernetesExecutor) Stop() error {
return e.client.Pods(e.pod.ObjectMeta.Namespace).Delete(e.pod.ObjectMeta.Name, nil)
}
func podPhaseToExecutorPhase(in api.PodPhase) executors.ExecutorPhase {
switch in {
case api.PodRunning:
return executors.ExecutorRunning
case api.PodPending:
return executors.ExecutorPreparing
case api.PodSucceeded:
return executors.ExecutorSucceeded
case api.PodFailed:
return executors.ExecutorFailed
default:
return executors.ExecutorUnknown
}
}
func (e *KubernetesExecutor) WaitForState(targetState executors.ExecutorPhase) error {
Loop:
for {
pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Get(e.pod.ObjectMeta.Name)
if err != nil {
return err
}
Switch:
switch podPhaseToExecutorPhase(pod.Status.Phase) {
case targetState:
break Loop
default:
break Switch
}
time.Sleep(1 * time.Second)
}
return nil
}
func init() {
common.RegisterExecutor("kubernetes", common.ExecutorFactory{
Create: func(j executors.Job) executors.Executor {
return &KubernetesExecutor{
Host: kubernetesHost,
Namespace: kubernetesNamespace,
Image: dockerImage,
}
},
})
}
-73
View File
@@ -1,73 +0,0 @@
package job
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client"
log "github.com/Sirupsen/logrus"
)
type Job struct {
Host string
Pod *api.Pod
client *client.Client
}
func (t Job) String() string {
return t.Pod.ObjectMeta.Name
}
func (t Job) Start() error {
log.Print("Executing job: ", t)
client, err := client.New(
&client.Config{
Host: t.Host,
})
if err != nil {
return err
}
t.client = client
log.Print("Creating pod...")
pod, err := t.client.Pods(t.Pod.ObjectMeta.Namespace).Create(t.Pod)
if err != nil {
return err
}
t.Pod = pod
log.Print("Successfully scheduled job on cluster with name: ", pod.ObjectMeta.Name)
return nil
}
func (t Job) Stop() error {
return t.client.Pods(t.Pod.ObjectMeta.Namespace).Delete(t.Pod.ObjectMeta.Name, nil)
}
func (t Job) WaitForState(targetState api.PodPhase) error {
Loop:
for {
pod, err := t.client.Pods(t.Pod.ObjectMeta.Namespace).Get(t.Pod.ObjectMeta.Name)
if err != nil {
return err
}
Switch:
switch pod.Status.Phase {
case targetState:
break Loop
default:
break Switch
}
time.Sleep(1 * time.Second)
}
return nil
}
+43 -31
View File
@@ -2,56 +2,68 @@ package main
import (
"os"
"os/signal"
"syscall"
log "github.com/Sirupsen/logrus"
"k8s.io/kubernetes/pkg/api"
"github.com/munnerz/plex-elastic-transcoder/common"
"github.com/munnerz/plex-elastic-transcoder/executors"
"github.com/munnerz/plex-elastic-transcoder/job"
_ "github.com/munnerz/plex-elastic-transcoder/executors/kubernetes"
)
const kubernetesHost = "http://10.20.40.254:8080/"
const kubernetesNamespace = "default"
const dockerImage = "registry.marley.xyz/e720/plex-new-transcoder"
const podBasename = "plex-transcoder"
var executor executors.Executor
func signals() {
// Signal handling
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
<-c
log.Print("Shutting down...")
if executor != nil {
log.Print("Terminating running job...")
err := executor.Stop()
if err != nil {
log.Fatal("Error terminating job: ", err)
os.Exit(1)
}
log.Print("Successfully terminated running job.")
}
os.Exit(0)
}()
}
func main() {
// Setup signals
signals()
// Get the arguments passed to Plex New Transcoder
args := os.Args[1:]
log.Print("Dispatching job with args: ", args)
job := job.Job{
Host: kubernetesHost,
Pod:
&api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
},
ObjectMeta: api.ObjectMeta{
GenerateName: podBasename,
Namespace: kubernetesNamespace,
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{
api.Container{
Name: podBasename,
Image: dockerImage,
job := executors.Job{
Command: []string{"/Plex New Transcoder"},
Args: args,
},
},
},
},
}
err := job.Start()
executor = common.CreateExecutor(job)
log.Print("Created executor: ", executor)
err := executor.Start()
if err != nil {
log.Fatal("Job start failed with error: ", err)
}
log.Print("Waiting for build pod to enter Running state...")
err = job.WaitForState(api.PodRunning)
err = executor.WaitForState(executors.ExecutorRunning)
if err != nil {
log.Fatal("Error waiting for pod to enter running state: ", err)
}
@@ -59,14 +71,14 @@ func main() {
log.Print("Job has started running...")
log.Print("Waiting for job to complete...")
err = job.WaitForState(api.PodSucceeded)
err = executor.WaitForState(executors.ExecutorSucceeded)
if err != nil {
log.Fatal("Error waiting for job to complete: ", err)
}
log.Print("Job completed. Destroying pod...")
err = job.Stop()
err = executor.Stop()
if err != nil {
log.Fatal("Error stopping job: ", err)
}