From dea10547eb04d8965151f741b8f296612f15524d Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Fri, 6 Oct 2017 16:30:34 +0200 Subject: [PATCH 1/5] Initial commit. --- main.go | 473 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 473 insertions(+) diff --git a/main.go b/main.go index e69de29..8141c17 100644 --- a/main.go +++ b/main.go @@ -0,0 +1,473 @@ +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "strconv" + "time" + + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/aws/aws-sdk-go/service/ecs" + "github.com/go-yaml/yaml" +) + +var outFile = flag.String("config.write-to", "ecs_file_sd.yml", "path of file to write ECS service discovery information to") +var interval = flag.Duration("config.scrape-interval", 60*time.Second, "interval at which to scrape the AWS API for ECS service discovery information") +var times = flag.Int("config.scrape-times", 0, "how many times to scrape before exiting (0 = infinite)") + +func logError(err error) { + if err != nil { + if aerr, ok := err.(awserr.Error); ok { + switch aerr.Code() { + case ecs.ErrCodeServerException: + log.Println(ecs.ErrCodeServerException, aerr.Error()) + case ecs.ErrCodeClientException: + log.Println(ecs.ErrCodeClientException, aerr.Error()) + case ecs.ErrCodeInvalidParameterException: + log.Println(ecs.ErrCodeInvalidParameterException, aerr.Error()) + case ecs.ErrCodeClusterNotFoundException: + log.Println(ecs.ErrCodeClusterNotFoundException, aerr.Error()) + default: + log.Println(aerr.Error()) + } + } else { + // Print the error, cast err to awserr.Error to get the Code and + // Message from an error. + log.Println(err.Error()) + } + } +} + +func GetClusters(svc *ecs.ECS) (*ecs.ListClustersOutput, error) { + input := &ecs.ListClustersInput{} + output := &ecs.ListClustersOutput{} + for { + myoutput, err := svc.ListClusters(input) + if err != nil { + return nil, err + } + output.ClusterArns = append(output.ClusterArns, myoutput.ClusterArns...) + if output.NextToken == nil { + break + } + input.NextToken = output.NextToken + } + return output, nil +} + +type AugmentedTask struct { + *ecs.Task + TaskDefinition *ecs.TaskDefinition + EC2Instance *ec2.Instance +} + +type PrometheusContainer struct { + ContainerName string + ContainerArn string + DockerImage string + Port int +} + +// ExportedContainers returns a list of []*PrometheusContainer +// enumerating the ports that the container exports for +// Prometheus, one per container, so long as the Docker +// labels in its corresponding container definition have +// a PROMETHEUS_EXPORTER_PORT_INDEX corresponding to an +// existing port mapping index for that container. +// +// Thus, a task with a container definition that has +// ... +// "Name": "mosquitto", +// "DockerLabels": { +// "PROMETHEUS_EXPORTER_PORTINDEX": "1" +// }, +// ... +// "PortMappings": [ +// { +// "ContainerPort": 1883, +// "HostPort": 0, +// "Protocol": "tcp" +// }, +// { +// "ContainerPort": 9001, +// "HostPort": 0, +// "Protocol": "tcp" +// } +// ], +// ... +// would see its second port (whatever the host port was +// for the running container that got mapped to port 9001) +// exposed as a mapped Prometheus port. +func (t *AugmentedTask) ExportedContainers() []*PrometheusContainer { + instances := []*PrometheusContainer{} + for _, c := range t.Containers { + var d *ecs.ContainerDefinition + for _, d = range t.TaskDefinition.ContainerDefinitions { + if *c.Name == *d.Name { + break + } + } + if *c.Name != *d.Name { + continue + } + var v *string + var ok bool + if v, ok = d.DockerLabels["PROMETHEUS_EXPORTER_PORT_INDEX"]; !ok { + continue + } + var err error + var portindex int + if portindex, err = strconv.Atoi(*v); err != nil || portindex < 0 || portindex >= len(d.PortMappings) { + continue + } + instances = append(instances, &PrometheusContainer{*c.Name, *c.ContainerArn, *d.Image, int(*c.NetworkBindings[portindex].HostPort)}) + } + return instances +} + +type PrometheusTaskInfo struct { + Targets []string `yaml:"targets"` + Labels yaml.MapSlice `yaml:"labels"` +} + +func (t *AugmentedTask) ExporterInformation() []*PrometheusTaskInfo { + ret := []*PrometheusTaskInfo{} + instances := t.ExportedContainers() + if len(instances) == 0 { + return ret + } + var ip string + if t.EC2Instance == nil { + return ret + } + if len(t.EC2Instance.NetworkInterfaces) == 0 { + return ret + } + for _, iface := range t.EC2Instance.NetworkInterfaces { + if iface.PrivateIpAddress != nil && *iface.PrivateIpAddress != "" { + ip = *iface.PrivateIpAddress + break + } + } + if ip == "" { + return ret + } + for _, i := range instances { + labels := yaml.MapSlice{} + labels = append(labels, + yaml.MapItem{"task_arn", *t.TaskArn}, + yaml.MapItem{"task_name", *t.TaskDefinition.Family}, + yaml.MapItem{"task_revision", fmt.Sprintf("%d", *t.TaskDefinition.Revision)}, + yaml.MapItem{"task_group", *t.Group}, + yaml.MapItem{"cluster_arn", *t.ClusterArn}, + yaml.MapItem{"container_name", i.ContainerName}, + yaml.MapItem{"container_arn", i.ContainerArn}, + yaml.MapItem{"docker_image", i.DockerImage}, + ) + ret = append(ret, &PrometheusTaskInfo{ + Targets: []string{fmt.Sprintf("%s:%d", ip, i.Port)}, + Labels: labels, + }) + } + return ret +} + +func AddTaskDefinitionsOfTasks(svc *ecs.ECS, taskList []*AugmentedTask) ([]*AugmentedTask, error) { + task2def := make(map[string]*ecs.TaskDefinition) + for _, task := range taskList { + task2def[*task.TaskDefinitionArn] = nil + } + + jobs := make(chan *ecs.DescribeTaskDefinitionInput, len(task2def)) + results := make(chan struct { + out *ecs.DescribeTaskDefinitionOutput + err error + }, len(task2def)) + + for w := 1; w <= 4; w++ { + go func() { + for in := range jobs { + out, err := svc.DescribeTaskDefinition(in) + results <- struct { + out *ecs.DescribeTaskDefinitionOutput + err error + }{out, err} + } + }() + } + + for tn := range task2def { + m := string(append([]byte{}, tn...)) + jobs <- &ecs.DescribeTaskDefinitionInput{TaskDefinition: &m} + } + close(jobs) + + var err error + for range task2def { + result := <-results + if result.err != nil { + err = result.err + log.Printf("Error describing task definition: %s", err) + } else { + log.Printf("Described task definition %s", *result.out.TaskDefinition.TaskDefinitionArn) + task2def[*result.out.TaskDefinition.TaskDefinitionArn] = result.out.TaskDefinition + } + } + if err != nil { + return nil, err + } + + for _, task := range taskList { + task.TaskDefinition = task2def[*task.TaskDefinitionArn] + } + return taskList, nil +} + +func StringToStarString(s []string) []*string { + c := make([]*string, 0, len(s)) + for n, _ := range s { + c = append(c, &s[n]) + } + return c +} + +func DescribeInstancesUnpaginated(svcec2 *ec2.EC2, instanceIds []string) ([]*ec2.Instance, error) { + input := &ec2.DescribeInstancesInput{ + InstanceIds: StringToStarString(instanceIds), + } + finalOutput := &ec2.DescribeInstancesOutput{} + for { + output, err := svcec2.DescribeInstances(input) + if err != nil { + return nil, err + } + log.Printf("Described %d EC2 reservations", len(output.Reservations)) + finalOutput.Reservations = append(finalOutput.Reservations, output.Reservations...) + if output.NextToken == nil { + break + } + input.NextToken = output.NextToken + } + result := []*ec2.Instance{} + for _, rsv := range finalOutput.Reservations { + for _, i := range rsv.Instances { + result = append(result, i) + } + } + return result, nil +} + +func AddContainerInstancesToTasks(svc *ecs.ECS, svcec2 *ec2.EC2, taskList []*AugmentedTask) ([]*AugmentedTask, error) { + clusterArnToContainerInstancesArns := make(map[string]map[string]*ecs.ContainerInstance) + for _, task := range taskList { + if _, ok := clusterArnToContainerInstancesArns[*task.ClusterArn]; !ok { + clusterArnToContainerInstancesArns[*task.ClusterArn] = make(map[string]*ecs.ContainerInstance) + } + clusterArnToContainerInstancesArns[*task.ClusterArn][*task.ContainerInstanceArn] = nil + } + + instanceIDToEC2Instance := make(map[string]*ec2.Instance) + for clusterArn, containerInstancesArns := range clusterArnToContainerInstancesArns { + keys := make([]string, 0, len(containerInstancesArns)) + for k := range containerInstancesArns { + keys = append(keys, k) + } + input := &ecs.DescribeContainerInstancesInput{ + Cluster: &clusterArn, + ContainerInstances: StringToStarString(keys), + } + output, err := svc.DescribeContainerInstances(input) + if err != nil { + return nil, err + } + log.Printf("Described %d container instances in cluster %s", len(output.ContainerInstances), clusterArn) + if len(output.Failures) > 0 { + log.Printf("Described %d failures in cluster %s", len(output.Failures), clusterArn) + } + for _, ci := range output.ContainerInstances { + clusterArnToContainerInstancesArns[clusterArn][*ci.ContainerInstanceArn] = ci + instanceIDToEC2Instance[*ci.Ec2InstanceId] = nil + } + } + + keys := make([]string, 0, len(instanceIDToEC2Instance)) + for id, _ := range instanceIDToEC2Instance { + keys = append(keys, id) + } + + instances, err := DescribeInstancesUnpaginated(svcec2, keys) + if err != nil { + return nil, err + } + + for _, i := range instances { + instanceIDToEC2Instance[*i.InstanceId] = i + } + + for _, task := range taskList { + containerInstance, ok := clusterArnToContainerInstancesArns[*task.ClusterArn][*task.ContainerInstanceArn] + if !ok { + log.Printf("Cannot find container instance %s in cluster %s", *task.ContainerInstanceArn, *task.ClusterArn) + continue + } + instance, ok := instanceIDToEC2Instance[*containerInstance.Ec2InstanceId] + if !ok { + log.Printf("Cannot find EC2 instance", *containerInstance.Ec2InstanceId) + continue + } + task.EC2Instance = instance + } + + return taskList, nil +} + +func GetTasksOfClusters(svc *ecs.ECS, svcec2 *ec2.EC2, clusterArns []*string) ([]*ecs.Task, error) { + jobs := make(chan *string, len(clusterArns)) + results := make(chan struct { + out *ecs.DescribeTasksOutput + err error + }, len(clusterArns)) + + for w := 1; w <= 4; w++ { + go func() { + for clusterArn := range jobs { + input := &ecs.ListTasksInput{ + Cluster: clusterArn, + } + finalOutput := &ecs.DescribeTasksOutput{} + var err error + for { + output, err1 := svc.ListTasks(input) + if err != nil { + err = err1 + log.Printf("Error listing tasks of cluster %s: %s", *clusterArn, err) + break + } + if len(output.TaskArns) == 0 { + break + } + log.Printf("Inspected cluster %s, found %d tasks", *clusterArn, len(output.TaskArns)) + descOutput, err2 := svc.DescribeTasks(&ecs.DescribeTasksInput{ + Cluster: clusterArn, + Tasks: output.TaskArns, + }) + if err2 != nil { + err = err2 + log.Printf("Error describing tasks of cluster %s: %s", *clusterArn, err) + break + } + log.Printf("Described %d tasks in cluster %s", len(descOutput.Tasks), *clusterArn) + if len(descOutput.Failures) > 0 { + log.Printf("Described %d failures in cluster %s", len(descOutput.Failures), *clusterArn) + } + finalOutput.Tasks = append(finalOutput.Tasks, descOutput.Tasks...) + finalOutput.Failures = append(finalOutput.Failures, descOutput.Failures...) + if output.NextToken == nil { + break + } + input.NextToken = output.NextToken + } + results <- struct { + out *ecs.DescribeTasksOutput + err error + }{finalOutput, err} + } + }() + } + + for _, clusterArn := range clusterArns { + jobs <- clusterArn + } + close(jobs) + + tasks := []*ecs.Task{} + for range clusterArns { + result := <-results + if result.err != nil { + return nil, result.err + } else { + for _, task := range result.out.Tasks { + tasks = append(tasks, task) + } + } + } + + return tasks, nil +} + +func GetAugmentedTasks(svc *ecs.ECS, svcec2 *ec2.EC2, clusterArns []*string) ([]*AugmentedTask, error) { + simpleTasks, err := GetTasksOfClusters(svc, svcec2, clusterArns) + if err != nil { + return nil, err + } + + tasks := []*AugmentedTask{} + for _, t := range simpleTasks { + tasks = append(tasks, &AugmentedTask{t, nil, nil}) + } + + tasks, err = AddTaskDefinitionsOfTasks(svc, tasks) + if err != nil { + return nil, err + } + + tasks, err = AddContainerInstancesToTasks(svc, svcec2, tasks) + if err != nil { + return nil, err + } + + return tasks, nil +} + +func main() { + flag.Parse() + sess := session.New() + svc := ecs.New(sess) + svcec2 := ec2.New(sess) + work := func() { + clusters, err := GetClusters(svc) + if err != nil { + logError(err) + return + } + tasks, err := GetAugmentedTasks(svc, svcec2, clusters.ClusterArns) + if err != nil { + logError(err) + return + } + infos := []*PrometheusTaskInfo{} + for _, t := range tasks { + info := t.ExporterInformation() + infos = append(infos, info...) + } + m, err := yaml.Marshal(infos) + if err != nil { + logError(err) + return + } + log.Printf("Writing %d discovered exporters to %s", len(infos), *outFile) + err = ioutil.WriteFile(*outFile, m, 0644) + if err != nil { + logError(err) + return + } + } + s := time.NewTimer(1 * time.Millisecond) + t := time.NewTicker(*interval) + n := *times + for { + select { + case <-s.C: + case <-t.C: + } + work() + n = n - 1 + if *times > 0 && n == 0 { + break + } + } +} From ab88ec8866038b295667dfac345c85a867653047 Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Fri, 6 Oct 2017 16:36:55 +0200 Subject: [PATCH 2/5] License file added and program text licensed. --- LICENSE | 174 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 14 +++++ 2 files changed, 188 insertions(+) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..78eb129 --- /dev/null +++ b/LICENSE @@ -0,0 +1,174 @@ +Apache License +Version 2.0, January 2004 +http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + +"License" shall mean the terms and conditions for use, reproduction, +and distribution as defined by Sections 1 through 9 of this document. + +"Licensor" shall mean the copyright owner or entity authorized by +the copyright owner that is granting the License. + +"Legal Entity" shall mean the union of the acting entity and all +other entities that control, are controlled by, or are under common +control with that entity. For the purposes of this definition, +"control" means (i) the power, direct or indirect, to cause the +direction or management of such entity, whether by contract or +otherwise, or (ii) ownership of fifty percent (50%) or more of the +outstanding shares, or (iii) beneficial ownership of such entity. + +"You" (or "Your") shall mean an individual or Legal Entity +exercising permissions granted by this License. + +"Source" form shall mean the preferred form for making modifications, +including but not limited to software source code, documentation +source, and configuration files. + +"Object" form shall mean any form resulting from mechanical +transformation or translation of a Source form, including but +not limited to compiled object code, generated documentation, +and conversions to other media types. + +"Work" shall mean the work of authorship, whether in Source or +Object form, made available under the License, as indicated by a +copyright notice that is included in or attached to the work +(an example is provided in the Appendix below). + +"Derivative Works" shall mean any work, whether in Source or Object +form, that is based on (or derived from) the Work and for which the +editorial revisions, annotations, elaborations, or other modifications +represent, as a whole, an original work of authorship. For the purposes +of this License, Derivative Works shall not include works that remain +separable from, or merely link (or bind by name) to the interfaces of, +the Work and Derivative Works thereof. + +"Contribution" shall mean any work of authorship, including +the original version of the Work and any modifications or additions +to that Work or Derivative Works thereof, that is intentionally +submitted to Licensor for inclusion in the Work by the copyright owner +or by an individual or Legal Entity authorized to submit on behalf of +the copyright owner. For the purposes of this definition, "submitted" +means any form of electronic, verbal, or written communication sent +to the Licensor or its representatives, including but not limited to +communication on electronic mailing lists, source code control systems, +and issue tracking systems that are managed by, or on behalf of, the +Licensor for the purpose of discussing and improving the Work, but +excluding communication that is conspicuously marked or otherwise +designated in writing by the copyright owner as "Not a Contribution." + +"Contributor" shall mean Licensor and any individual or Legal Entity +on behalf of whom a Contribution has been received by Licensor and +subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +copyright license to reproduce, prepare Derivative Works of, +publicly display, publicly perform, sublicense, and distribute the +Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of +this License, each Contributor hereby grants to You a perpetual, +worldwide, non-exclusive, no-charge, royalty-free, irrevocable +(except as stated in this section) patent license to make, have made, +use, offer to sell, sell, import, and otherwise transfer the Work, +where such license applies only to those patent claims licensable +by such Contributor that are necessarily infringed by their +Contribution(s) alone or by combination of their Contribution(s) +with the Work to which such Contribution(s) was submitted. If You +institute patent litigation against any entity (including a +cross-claim or counterclaim in a lawsuit) alleging that the Work +or a Contribution incorporated within the Work constitutes direct +or contributory patent infringement, then any patent licenses +granted to You under this License for that Work shall terminate +as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the +Work or Derivative Works thereof in any medium, with or without +modifications, and in Source or Object form, provided that You +meet the following conditions: + +(a) You must give any other recipients of the Work or +Derivative Works a copy of this License; and + +(b) You must cause any modified files to carry prominent notices +stating that You changed the files; and + +(c) You must retain, in the Source form of any Derivative Works +that You distribute, all copyright, patent, trademark, and +attribution notices from the Source form of the Work, +excluding those notices that do not pertain to any part of +the Derivative Works; and + +(d) If the Work includes a "NOTICE" text file as part of its +distribution, then any Derivative Works that You distribute must +include a readable copy of the attribution notices contained +within such NOTICE file, excluding those notices that do not +pertain to any part of the Derivative Works, in at least one +of the following places: within a NOTICE text file distributed +as part of the Derivative Works; within the Source form or +documentation, if provided along with the Derivative Works; or, +within a display generated by the Derivative Works, if and +wherever such third-party notices normally appear. The contents +of the NOTICE file are for informational purposes only and +do not modify the License. You may add Your own attribution +notices within Derivative Works that You distribute, alongside +or as an addendum to the NOTICE text from the Work, provided +that such additional attribution notices cannot be construed +as modifying the License. + +You may add Your own copyright statement to Your modifications and +may provide additional or different license terms and conditions +for use, reproduction, or distribution of Your modifications, or +for any such Derivative Works as a whole, provided Your use, +reproduction, and distribution of the Work otherwise complies with +the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, +any Contribution intentionally submitted for inclusion in the Work +by You to the Licensor shall be under the terms and conditions of +this License, without any additional terms or conditions. +Notwithstanding the above, nothing herein shall supersede or modify +the terms of any separate license agreement you may have executed +with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade +names, trademarks, service marks, or product names of the Licensor, +except as required for reasonable and customary use in describing the +origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or +agreed to in writing, Licensor provides the Work (and each +Contributor provides its Contributions) on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +implied, including, without limitation, any warranties or conditions +of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A +PARTICULAR PURPOSE. You are solely responsible for determining the +appropriateness of using or redistributing the Work and assume any +risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, +whether in tort (including negligence), contract, or otherwise, +unless required by applicable law (such as deliberate and grossly +negligent acts) or agreed to in writing, shall any Contributor be +liable to You for damages, including any direct, indirect, special, +incidental, or consequential damages of any character arising as a +result of this License or out of the use or inability to use the +Work (including but not limited to damages for loss of goodwill, +work stoppage, computer failure or malfunction, or any and all +other commercial damages or losses), even if such Contributor +has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing +the Work or Derivative Works thereof, You may choose to offer, +and charge a fee for, acceptance of support, warranty, indemnity, +or other liability obligations and/or rights consistent with this +License. However, in accepting such obligations, You may act only +on Your own behalf and on Your sole responsibility, not on behalf +of any other Contributor, and only if You agree to indemnify, +defend, and hold each Contributor harmless for any liability +incurred by, or claims asserted against, such Contributor by reason +of your accepting any such warranty or additional liability. diff --git a/main.go b/main.go index 8141c17..69b1c4f 100644 --- a/main.go +++ b/main.go @@ -1,3 +1,17 @@ +// Copyright 2017 Teralytics. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( From f2ac57d3fb724ade9157a1a94d97f53d87923d6e Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Fri, 6 Oct 2017 16:55:22 +0200 Subject: [PATCH 3/5] Add README.md --- README.md | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..2edea62 --- /dev/null +++ b/README.md @@ -0,0 +1,55 @@ +# Prometheus Amazon ECS discovery + +Prometheus has native Amazon EC2 discovery capabilities, but it does +not have the capacity to discover ECS instances that can be scraped +by Prometheus. This program is a Prometheus File Service Discovery +(`file_sd_config`) integration that bridges said gap. + +## Help + +Run `prometheus-ecs-discovery -?` to get information. + +The command line parameters that can be used are: + +* -config.scrape-interval (duration): interval at which to scrape + the AWS API for ECS service discovery information (default 1m0s) +* -config.scrape-times (int): how many times to scrape before + exiting (0 = infinite) +* -config.write-to (string): path of file to write ECS service + discovery information to (default "ecs_file_sd.yml") + +## Usage + +First, build this program using the usual `go get` mechanism. + +Then, run it as follows: + +* Ensure the program can write to a directory readable by + your Prometheus master instance(s). +* Export the usual `AWS_REGION`, `AWS_ACCESS_KEY_ID` and + `AWS_SECRET_ACCESS_KEY` into the environment of the program, + making sure that the keys have access to the EC2 / ECS APIs + (IAM policies should include `ECS:ListClusters`, + `ECS:ListTasks`, `ECS:DescribeTask`, `EC2:DescribeInstances`, + `ECS:DescribeContainerInstances`, `ECS:DescribeTasks`, + `ECS:DescribeTaskDefinition`). +* Start the program, using the command line option + `-config.write-to` to point the program to the specific + folder that your Prometheus master can read from. +* Add a `file_sd_config` to your Prometheus master: + +``` +scrape_configs: +- job_name: ecs + file_sd_configs: + - files: + - /path/to/ecs_file_sd.yml + refresh_interval: 10m +``` + +That's it. You should begin seeing the program scraping the +AWS APIs and writing the discovery file (by default it does +that every minute, and by default Prometheus will reload the +file the minute it is written). After reloading your Prometheus +master configuration, this program will begin informing via +the discovery file of new targets that Prometheus must scrape. From 0f1322439c9f864518c18d072e288ed5f6cbf126 Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Tue, 10 Oct 2017 16:37:04 +0200 Subject: [PATCH 4/5] Add documentation and simplify code. --- main.go | 118 ++++++++++++++++++++++++++++++++++---------------------- 1 file changed, 71 insertions(+), 47 deletions(-) diff --git a/main.go b/main.go index 69b1c4f..b4e26e0 100644 --- a/main.go +++ b/main.go @@ -33,6 +33,8 @@ var outFile = flag.String("config.write-to", "ecs_file_sd.yml", "path of file to var interval = flag.Duration("config.scrape-interval", 60*time.Second, "interval at which to scrape the AWS API for ECS service discovery information") var times = flag.Int("config.scrape-times", 0, "how many times to scrape before exiting (0 = infinite)") +// logError is a convenience function that decodes all possible ECS +// errors and displays them to standard error. func logError(err error) { if err != nil { if aerr, ok := err.(awserr.Error); ok { @@ -56,6 +58,8 @@ func logError(err error) { } } +// GetClusters retrieves a list of *ClusterArns from Amazon ECS, +// dealing with the mandatory pagination as needed. func GetClusters(svc *ecs.ECS) (*ecs.ListClustersOutput, error) { input := &ecs.ListClustersInput{} output := &ecs.ListClustersOutput{} @@ -73,12 +77,18 @@ func GetClusters(svc *ecs.ECS) (*ecs.ListClustersOutput, error) { return output, nil } +// AugmentedTask represents an ECS task augmented with an extra set of +// structures representing the ECS task definition and EC2 instance +// associated with the running task. type AugmentedTask struct { *ecs.Task TaskDefinition *ecs.TaskDefinition EC2Instance *ec2.Instance } +// PrometheusContainer represents a tuple of information +// (Container Name, Container ARN, Docker image, Port) +// extracted from a task, its task definition type PrometheusContainer struct { ContainerName string ContainerArn string @@ -86,12 +96,20 @@ type PrometheusContainer struct { Port int } -// ExportedContainers returns a list of []*PrometheusContainer -// enumerating the ports that the container exports for -// Prometheus, one per container, so long as the Docker -// labels in its corresponding container definition have -// a PROMETHEUS_EXPORTER_PORT_INDEX corresponding to an -// existing port mapping index for that container. +// PrometheusTaskInfo is the final structure that will be +// output as a Prometheus file service discovery config. +type PrometheusTaskInfo struct { + Targets []string `yaml:"targets"` + Labels yaml.MapSlice `yaml:"labels"` +} + +// ExporterInformation returns a list of []*PrometheusTaskInfo +// enumerating the IPs, ports that the task's containers exports +// to Prometheus (one per container), so long as the Docker +// labels in its corresponding container definition for the +// container in the task has a PROMETHEUS_EXPORTER_PORT_INDEX +// corresponding to an existing port mapping index for that +// container. // // Thus, a task with a container definition that has // ... @@ -116,44 +134,8 @@ type PrometheusContainer struct { // would see its second port (whatever the host port was // for the running container that got mapped to port 9001) // exposed as a mapped Prometheus port. -func (t *AugmentedTask) ExportedContainers() []*PrometheusContainer { - instances := []*PrometheusContainer{} - for _, c := range t.Containers { - var d *ecs.ContainerDefinition - for _, d = range t.TaskDefinition.ContainerDefinitions { - if *c.Name == *d.Name { - break - } - } - if *c.Name != *d.Name { - continue - } - var v *string - var ok bool - if v, ok = d.DockerLabels["PROMETHEUS_EXPORTER_PORT_INDEX"]; !ok { - continue - } - var err error - var portindex int - if portindex, err = strconv.Atoi(*v); err != nil || portindex < 0 || portindex >= len(d.PortMappings) { - continue - } - instances = append(instances, &PrometheusContainer{*c.Name, *c.ContainerArn, *d.Image, int(*c.NetworkBindings[portindex].HostPort)}) - } - return instances -} - -type PrometheusTaskInfo struct { - Targets []string `yaml:"targets"` - Labels yaml.MapSlice `yaml:"labels"` -} - func (t *AugmentedTask) ExporterInformation() []*PrometheusTaskInfo { ret := []*PrometheusTaskInfo{} - instances := t.ExportedContainers() - if len(instances) == 0 { - return ret - } var ip string if t.EC2Instance == nil { return ret @@ -170,7 +152,36 @@ func (t *AugmentedTask) ExporterInformation() []*PrometheusTaskInfo { if ip == "" { return ret } - for _, i := range instances { + for _, i := range t.Containers { + // Let's go over the containers to see which ones are defined + // and have a Prometheus exported port. + var d *ecs.ContainerDefinition + for _, d = range t.TaskDefinition.ContainerDefinitions { + if *i.Name == *d.Name { + // Aha, the container definition matchis this container we + // are inspecting, stop the loop cos we got the D now. + break + } + } + if *i.Name != *d.Name { + // Nope, no match, this container cannot be exported. We continue. + continue + } + var v *string + var ok bool + if v, ok = d.DockerLabels["PROMETHEUS_EXPORTER_PORT_INDEX"]; !ok { + // Nope, no Prometheus-exported port in this container def. + // This container is no good. We continue. + continue + } + var err error + var portindex int + if portindex, err = strconv.Atoi(*v); err != nil || portindex < 0 || portindex >= len(d.PortMappings) { + // This container has an invalid port definition. + // This container is no good. We continue. + continue + } + port := int(*i.NetworkBindings[portindex].HostPort) labels := yaml.MapSlice{} labels = append(labels, yaml.MapItem{"task_arn", *t.TaskArn}, @@ -178,18 +189,20 @@ func (t *AugmentedTask) ExporterInformation() []*PrometheusTaskInfo { yaml.MapItem{"task_revision", fmt.Sprintf("%d", *t.TaskDefinition.Revision)}, yaml.MapItem{"task_group", *t.Group}, yaml.MapItem{"cluster_arn", *t.ClusterArn}, - yaml.MapItem{"container_name", i.ContainerName}, - yaml.MapItem{"container_arn", i.ContainerArn}, - yaml.MapItem{"docker_image", i.DockerImage}, + yaml.MapItem{"container_name", *i.Name}, + yaml.MapItem{"container_arn", *i.ContainerArn}, + yaml.MapItem{"docker_image", *d.Image}, ) ret = append(ret, &PrometheusTaskInfo{ - Targets: []string{fmt.Sprintf("%s:%d", ip, i.Port)}, + Targets: []string{fmt.Sprintf("%s:%d", ip, port)}, Labels: labels, }) } return ret } +// AddTaskDefinitionsOfTasks adds to each Task the TaskDefinition +// corresponding to it. func AddTaskDefinitionsOfTasks(svc *ecs.ECS, taskList []*AugmentedTask) ([]*AugmentedTask, error) { task2def := make(map[string]*ecs.TaskDefinition) for _, task := range taskList { @@ -241,6 +254,9 @@ func AddTaskDefinitionsOfTasks(svc *ecs.ECS, taskList []*AugmentedTask) ([]*Augm return taskList, nil } +// StringToStarString converts a list of strings to a list of +// pointers to strings, which is a common requirement of the +// Amazon API. func StringToStarString(s []string) []*string { c := make([]*string, 0, len(s)) for n, _ := range s { @@ -249,6 +265,9 @@ func StringToStarString(s []string) []*string { return c } +// DescribeInstancesUnpaginated describes a list of EC2 instances. +// It is unpaginated because the API function does not require +// pagination. func DescribeInstancesUnpaginated(svcec2 *ec2.EC2, instanceIds []string) ([]*ec2.Instance, error) { input := &ec2.DescribeInstancesInput{ InstanceIds: StringToStarString(instanceIds), @@ -275,6 +294,8 @@ func DescribeInstancesUnpaginated(svcec2 *ec2.EC2, instanceIds []string) ([]*ec2 return result, nil } +// AddContainerInstancesToTasks adds to each Task the EC2 instance +// running its containers. func AddContainerInstancesToTasks(svc *ecs.ECS, svcec2 *ec2.EC2, taskList []*AugmentedTask) ([]*AugmentedTask, error) { clusterArnToContainerInstancesArns := make(map[string]map[string]*ecs.ContainerInstance) for _, task := range taskList { @@ -339,6 +360,7 @@ func AddContainerInstancesToTasks(svc *ecs.ECS, svcec2 *ec2.EC2, taskList []*Aug return taskList, nil } +// GetTasksOfClusters returns the EC2 tasks running in a list of Clusters. func GetTasksOfClusters(svc *ecs.ECS, svcec2 *ec2.EC2, clusterArns []*string) ([]*ecs.Task, error) { jobs := make(chan *string, len(clusterArns)) results := make(chan struct { @@ -413,6 +435,8 @@ func GetTasksOfClusters(svc *ecs.ECS, svcec2 *ec2.EC2, clusterArns []*string) ([ return tasks, nil } +// GetAugmentedTasks gets the fully AugmentedTasks running on +// a list of Clusters. func GetAugmentedTasks(svc *ecs.ECS, svcec2 *ec2.EC2, clusterArns []*string) ([]*AugmentedTask, error) { simpleTasks, err := GetTasksOfClusters(svc, svcec2, clusterArns) if err != nil { From 7a01a516aa5d0f08a5d591336b99b391f39cfc4e Mon Sep 17 00:00:00 2001 From: "Manuel Amador (Rudd-O)" Date: Tue, 10 Oct 2017 16:37:46 +0200 Subject: [PATCH 5/5] Fix cast review comment. --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index b4e26e0..e8cc90a 100644 --- a/main.go +++ b/main.go @@ -38,6 +38,8 @@ var times = flag.Int("config.scrape-times", 0, "how many times to scrape before func logError(err error) { if err != nil { if aerr, ok := err.(awserr.Error); ok { + // Print the error, cast err to awserr.Error to get the Code and + // Message from an error. switch aerr.Code() { case ecs.ErrCodeServerException: log.Println(ecs.ErrCodeServerException, aerr.Error()) @@ -51,8 +53,6 @@ func logError(err error) { log.Println(aerr.Error()) } } else { - // Print the error, cast err to awserr.Error to get the Code and - // Message from an error. log.Println(err.Error()) } }