Load config from yaml file. Cleanup. Should now actually work.

This commit is contained in:
James Munnelly
2016-04-24 22:05:53 +01:00
parent fd9a28ad5d
commit 724e44f61d
6 changed files with 198 additions and 171 deletions
-2
View File
@@ -1,7 +1,5 @@
#!/bin/bash #!/bin/bash
mkdir -p "$1"
cd "$1" cd "$1"
echo "CD'd to directory: $1" echo "CD'd to directory: $1"
+29
View File
@@ -0,0 +1,29 @@
package common
import (
"k8s.io/kubernetes/pkg/api"
)
type Config struct {
LogFile string `group:"config" namespace:"config"`
Plex *PlexConfig `group:"plex config" namespace:"plex"`
Kubernetes *KubernetesConfig `group:"kubernetes executor" namespace:"kubernetes"`
}
type PlexConfig struct {
URL string
TranscodeDir string `yaml:"transcodeDir"`
MediaDir string `yaml:"mediaDir"`
}
type KubernetesConfig struct {
ProxyURL string `yaml:"proxyUrl"`
Namespace string
PodBasename string `yaml:"podBasename"`
Image string
TranscodeVolumeSource api.VolumeSource `yaml:"transcodeVolumeSource"`
MediaVolumeSource api.VolumeSource `yaml:"mediaVolumeSource"`
}
+30 -18
View File
@@ -1,40 +1,52 @@
package common package common
import ( import (
"errors"
"fmt" "fmt"
"github.com/munnerz/plex-elastic-transcoder/executors"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
) )
type ExecutorFactory struct { type ExecutorPhase string
Create func(executors.Job) executors.Executor
const (
ExecutorPhasePreparing ExecutorPhase = "Preparing"
ExecutorPhaseRunning ExecutorPhase = "Running"
ExecutorPhaseSucceeded ExecutorPhase = "Succeeded"
ExecutorPhaseFailed ExecutorPhase = "Failed"
ExecutorPhaseUnknown ExecutorPhase = "Unknown"
)
type Executor interface {
Start() error
Stop() error
WaitForState(ExecutorPhase) error
String() string
} }
var executorFactories map[string]ExecutorFactory type Job struct {
Args []string
}
func CreateExecutor(j executors.Job) executors.Executor { type ExecutorFactory struct {
// TODO: Some sort of executor selection algorithm Create func(Config, Job) Executor
}
var executorFactories = make(map[string]ExecutorFactory)
func CreateExecutor(config Config, j Job) (Executor, error) {
// TODO: Select which executor to use based on config
for _, e := range executorFactories { for _, e := range executorFactories {
// TODO: Here, run the create command defined in the factory return e.Create(config, j), nil
return e.Create(j)
// Hacky way to dispatch to the first executor
} }
panic("No executors registered!") return nil, fmt.Errorf("no configured executor found")
} }
func RegisterExecutor(name string, e ExecutorFactory) error { func RegisterExecutor(name string, e ExecutorFactory) error {
if executorFactories == nil {
executorFactories = make(map[string]ExecutorFactory)
}
if _, ok := executorFactories[name]; ok { if _, ok := executorFactories[name]; ok {
return errors.New(fmt.Sprintf("Executor already registered: %s", name)) return fmt.Errorf(fmt.Sprintf("executor already registered: %s", name))
} }
log.Print("Registered executor: ", name) log.Infof("Registered executor: %s", name)
executorFactories[name] = e executorFactories[name] = e
return nil return nil
} }
+6 -25
View File
@@ -2,34 +2,15 @@ package executors
import ( import (
"fmt" "fmt"
"github.com/munnerz/plex-elastic-transcoder/common"
) )
type Job struct {
Command []string
Args []string
}
type Executor interface {
Start() error
Stop() error
WaitForState(ExecutorPhase) error
String() string
}
type AbstractExecutor struct { type AbstractExecutor struct {
Job Job Config common.Config
Job common.Job
} }
type ExecutorPhase string
const (
ExecutorPreparing ExecutorPhase = "Preparing"
ExecutorRunning ExecutorPhase = "Running"
ExecutorSucceeded ExecutorPhase = "Succeeded"
ExecutorFailed ExecutorPhase = "Failed"
ExecutorUnknown ExecutorPhase = "Unknown"
)
func (e *AbstractExecutor) Start() error { func (e *AbstractExecutor) Start() error {
return nil return nil
} }
@@ -38,10 +19,10 @@ func (e *AbstractExecutor) Stop() error {
return nil return nil
} }
func (e *AbstractExecutor) WaitForState(p ExecutorPhase) error { func (e *AbstractExecutor) WaitForState(p common.ExecutorPhase) error {
return nil return nil
} }
func (e *AbstractExecutor) String() string { func (e *AbstractExecutor) String() string {
return fmt.Sprintf("%s %s", e.Job.Command, e.Job.Args) return fmt.Sprintf("%s", e.Job.Args)
} }
+58 -80
View File
@@ -1,82 +1,60 @@
package kubernetes package kubernetes
import ( import (
"time"
"errors"
"fmt" "fmt"
"time"
"k8s.io/kubernetes/pkg/api"
client "k8s.io/kubernetes/pkg/client/unversioned"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"k8s.io/kubernetes/pkg/api"
stableApi "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
client "k8s.io/kubernetes/pkg/client/unversioned"
"github.com/munnerz/plex-elastic-transcoder/executors"
"github.com/munnerz/plex-elastic-transcoder/common" "github.com/munnerz/plex-elastic-transcoder/common"
"github.com/munnerz/plex-elastic-transcoder/executors"
) )
const podBasename = "plex-transcoder"
const kubernetesHost = "10.20.40.254:8080"
const kubernetesNamespace = "plex"
const dockerImage = "registry.marley.xyz/munnerz/plex-new-transcoder"
type KubernetesExecutor struct { type KubernetesExecutor struct {
executors.AbstractExecutor executors.AbstractExecutor
Host string pod *api.Pod
Namespace string client *client.Client
Image string
pod *api.Pod
client *client.Client
} }
func (e *KubernetesExecutor) createPod() *api.Pod { func (e *KubernetesExecutor) createPod() *api.Pod {
fmt.Println("Args: ", e.Job.Args)
return &api.Pod{ return &api.Pod{
TypeMeta: api.TypeMeta{ TypeMeta: stableApi.TypeMeta{
Kind: "Pod", Kind: "Pod",
}, },
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
GenerateName: podBasename, GenerateName: fmt.Sprintf("%s-", e.Config.Kubernetes.PodBasename),
Namespace: e.Namespace, Namespace: e.Config.Kubernetes.Namespace,
}, },
Spec: api.PodSpec{ Spec: api.PodSpec{
Volumes: []api.Volume{ Volumes: []api.Volume{
api.Volume{ api.Volume{
Name: "source-dir", Name: "source-dir",
VolumeSource: api.VolumeSource { VolumeSource: e.Config.Kubernetes.MediaVolumeSource,
NFS: &api.NFSVolumeSource {
Server: "10.12.14.16",
Path: "/tank/media",
ReadOnly: true,
},
},
}, },
api.Volume{ api.Volume{
Name: "transcode-dir", Name: "transcode-dir",
VolumeSource: api.VolumeSource { VolumeSource: e.Config.Kubernetes.TranscodeVolumeSource,
NFS: &api.NFSVolumeSource {
Server: "storage.kube.marley.xyz",
Path: "/shares/containers/plex/transcode",
},
},
}, },
}, },
RestartPolicy: api.RestartPolicyNever, RestartPolicy: api.RestartPolicyNever,
Containers: []api.Container{ Containers: []api.Container{
api.Container{ api.Container{
Name: podBasename, Name: "plex-new-transcoder",
Image: e.Image, Image: e.Config.Kubernetes.Image,
Command: e.Job.Command, Args: e.Job.Args,
Args: e.Job.Args,
VolumeMounts: []api.VolumeMount{ VolumeMounts: []api.VolumeMount{
api.VolumeMount{ api.VolumeMount{
Name: "source-dir", Name: "source-dir",
MountPath: "/data", MountPath: e.Config.Plex.MediaDir,
}, },
api.VolumeMount{ api.VolumeMount{
Name: "transcode-dir", Name: "transcode-dir",
MountPath: "/tmp", MountPath: e.Config.Plex.TranscodeDir,
}, },
}, },
ImagePullPolicy: api.PullAlways, ImagePullPolicy: api.PullAlways,
@@ -87,31 +65,30 @@ func (e *KubernetesExecutor) createPod() *api.Pod {
} }
func (e *KubernetesExecutor) Start() error { func (e *KubernetesExecutor) Start() error {
log.Print("Executing job: ", e) log.Debugf("executing job: %s", e.Job)
e.pod = e.createPod() if len(e.Config.Kubernetes.ProxyURL) > 0 {
e.client = client.NewOrDie(&restclient.Config{
client, err := client.New( Host: e.Config.Kubernetes.ProxyURL,
&client.Config{
Host: e.Host,
}) })
} else {
var err error
e.client, err = client.NewInCluster()
if err != nil {
return fmt.Errorf("failed to create kubernetes client: %s", err.Error())
}
}
pod := e.createPod()
var err error
e.pod, err = e.client.Pods(pod.ObjectMeta.Namespace).Create(pod)
if err != nil { if err != nil {
return err return err
} }
e.client = client log.Debugf("successfully scheduled job on cluster with name: %s", e.pod.ObjectMeta.Name)
log.Print("Creating pod...")
pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Create(e.pod)
if err != nil {
return err
}
e.pod = pod
log.Print("Successfully scheduled job on cluster with name: ", e.pod.ObjectMeta.Name)
return nil return nil
} }
@@ -120,35 +97,37 @@ func (e *KubernetesExecutor) Stop() error {
return e.client.Pods(e.pod.ObjectMeta.Namespace).Delete(e.pod.ObjectMeta.Name, nil) return e.client.Pods(e.pod.ObjectMeta.Namespace).Delete(e.pod.ObjectMeta.Name, nil)
} }
func podPhaseToExecutorPhase(in api.PodPhase) executors.ExecutorPhase { func podPhaseToExecutorPhase(in api.PodPhase) common.ExecutorPhase {
switch in { switch in {
case api.PodRunning: case api.PodRunning:
return executors.ExecutorRunning return common.ExecutorPhaseRunning
case api.PodPending: case api.PodPending:
return executors.ExecutorPreparing return common.ExecutorPhasePreparing
case api.PodSucceeded: case api.PodSucceeded:
return executors.ExecutorSucceeded return common.ExecutorPhaseSucceeded
case api.PodFailed: case api.PodFailed:
return executors.ExecutorFailed return common.ExecutorPhaseFailed
default: default:
return executors.ExecutorUnknown return common.ExecutorPhaseUnknown
} }
} }
func (e *KubernetesExecutor) WaitForState(targetState executors.ExecutorPhase) error { func (e *KubernetesExecutor) WaitForState(targetState common.ExecutorPhase) error {
Loop: Loop:
for { for {
pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Get(e.pod.ObjectMeta.Name) pod, err := e.client.Pods(e.pod.ObjectMeta.Namespace).Get(e.pod.ObjectMeta.Name)
if err != nil { if err != nil {
return err return err
} }
Switch: log.Debugf("pod state: %s", pod.Status.Phase)
Switch:
switch podPhaseToExecutorPhase(pod.Status.Phase) { switch podPhaseToExecutorPhase(pod.Status.Phase) {
case targetState: case targetState:
break Loop break Loop
case executors.ExecutorFailed: case common.ExecutorPhaseFailed:
return errors.New(fmt.Sprintf("Pod failed whilst waiting for state: %s\nReason: %s", targetState, pod.Status.Reason)) return fmt.Errorf("pod failed whilst waiting for state '%s': %s", targetState, pod.Status.Reason)
default: default:
break Switch break Switch
} }
@@ -159,14 +138,13 @@ func (e *KubernetesExecutor) WaitForState(targetState executors.ExecutorPhase) e
func init() { func init() {
common.RegisterExecutor("kubernetes", common.ExecutorFactory{ common.RegisterExecutor("kubernetes", common.ExecutorFactory{
Create: func(j executors.Job) executors.Executor { Create: func(config common.Config, job common.Job) common.Executor {
e := &KubernetesExecutor{ return &KubernetesExecutor{
Host: kubernetesHost, AbstractExecutor: executors.AbstractExecutor{
Namespace: kubernetesNamespace, Config: config,
Image: dockerImage, Job: job,
},
} }
e.Job = j
return e
}, },
}) })
} }
+73 -44
View File
@@ -1,27 +1,23 @@
package main package main
import ( import (
"io/ioutil"
"os" "os"
"os/signal" "os/signal"
"syscall"
"fmt"
"strings" "strings"
"syscall"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/go-yaml/yaml"
"github.com/munnerz/plex-elastic-transcoder/common" "github.com/munnerz/plex-elastic-transcoder/common"
"github.com/munnerz/plex-elastic-transcoder/executors"
_ "github.com/munnerz/plex-elastic-transcoder/executors/kubernetes" _ "github.com/munnerz/plex-elastic-transcoder/executors/kubernetes"
) )
const ( const configFile = "/etc/plex-elastic-transcoder/config.yaml"
cmdPath = "/plexmediaserver/bootstrap.sh"
logFilePath = "/var/log/plex/plex-elastic-transcoder.log"
plexServerURL = "10.20.40.60:32400"
)
var executor executors.Executor var executor common.Executor
func signals() { func signals() {
// Signal handling // Signal handling
@@ -31,91 +27,124 @@ func signals() {
signal.Notify(c, syscall.SIGTERM) signal.Notify(c, syscall.SIGTERM)
go func() { go func() {
<-c <-c
log.Print("Shutting down...") log.Infof("shutting down...")
if executor != nil { if executor != nil {
log.Print("Terminating running job...") log.Infof("terminating running job...")
err := executor.Stop() err := executor.Stop()
if err != nil { if err != nil {
log.Fatal("Error terminating job: ", err) log.Fatalf("error terminating job: %s", err.Error())
os.Exit(1) os.Exit(1)
} }
log.Print("Successfully terminated running job.") log.Infof("successfully terminated running job")
} }
os.Exit(0) os.Exit(0)
}() }()
} }
func loadConfig() (*common.Config, error) {
data, err := ioutil.ReadFile(configFile)
if err != nil {
return nil, err
}
config := new(common.Config)
err = yaml.Unmarshal(data, config)
return config, err
}
func main() { func main() {
log.SetLevel(log.DebugLevel)
config, err := loadConfig()
if err != nil {
log.Fatalf("error loading config: %s", err.Error())
}
log.Debugf("loaded config: %s", config)
// Setup signals // Setup signals
signals() signals()
// Setup logs // Setup logs
fo, err := os.Create(logFilePath) if len(config.LogFile) > 0 {
fo, err := os.Create(config.LogFile)
if err != nil { if err != nil {
panic(fmt.Sprintf("Error opening log file: %s", err)) log.Fatalf("error opening log file: %s", err)
}
defer func() {
if err := fo.Close(); err != nil {
panic(fmt.Sprintf("Error closing file: %s", err))
} }
}()
log.SetOutput(fo) log.SetOutput(fo)
defer func() {
if err := fo.Close(); err != nil {
log.Fatalf("error closing file: %s", err)
}
}()
}
// Get the arguments passed to Plex New Transcoder // Get the arguments passed to Plex New Transcoder
args := os.Args[1:] args := os.Args[1:]
log.Debugf("executing job with args: %s", args)
wd, _ := os.Getwd() wd, _ := os.Getwd()
for i, arg := range args { for i, arg := range args {
if arg == "-progressurl" { switch arg {
// Change the progress URL to report to about the transcode case "-progressurl":
args[i + 1] = strings.Replace(args[i+1], "127.0.0.1:32400", plexServerURL, 1) log.Debugf("replacing progressURL with: %s", config.Plex.URL)
args[i+1] = strings.Replace(args[i+1], "127.0.0.1:32400", config.Plex.URL, 1)
break
case "-loglevel":
case "loglevel_plex":
args[i+1] = "debug"
} }
} }
args = append([]string{wd}, args...) args = append([]string{wd}, args...)
log.Print("In WD: ", wd) log.Debugf("current working directory: %s", wd)
log.Print("Dispatching job with args: ", args)
job := executors.Job{ job := common.Job{
Command: []string{cmdPath},
Args: args, Args: args,
} }
executor = common.CreateExecutor(job) executor, err = common.CreateExecutor(*config, job)
log.Print("Created executor: ", executor)
err = executor.Start()
if err != nil { if err != nil {
log.Fatal("Job start failed with error: ", err) log.Fatalf("error creating executor: %s", err.Error())
} }
log.Print("Waiting for build pod to enter Running state...") log.Infof("created executor: ", executor)
err = executor.WaitForState(executors.ExecutorRunning) err = executor.Start()
if err != nil {
log.Fatalf("failed to start job: %s", err.Error())
}
log.Print("waiting for build pod to enter Running state...")
err = executor.WaitForState(common.ExecutorPhaseRunning)
if err != nil { if err != nil {
log.Fatal("Error waiting for pod to enter running state: ", err) log.Fatal("Error waiting for pod to enter running state: ", err)
} }
log.Print("Job has started running...") log.Infof("job has started running...")
log.Print("Waiting for job to complete...") log.Infof("waiting for job to complete...")
err = executor.WaitForState(executors.ExecutorSucceeded) err = executor.WaitForState(common.ExecutorPhaseSucceeded)
if err != nil { if err != nil {
log.Fatal("Error waiting for job to complete: ", err) log.Fatalf("error waiting for job to complete: %s", err.Error())
} }
log.Print("Job completed. Destroying pod...") log.Infof("job completed. cleaning up...")
err = executor.Stop() err = executor.Stop()
if err != nil { if err != nil {
log.Fatal("Error stopping job: ", err) log.Fatalf("error stopping job: %s", err.Error())
} }
log.Print("Pod destroyed. Exiting.") log.Printf("cleaned up successfully")
} }