Skip to content

Commit

Permalink
Add status field to FIAAS Application resource (#211)
Browse files Browse the repository at this point in the history
* First changes

* Improving tests

* Fix test

* Fix style errors

* Fix style

* Revert fix style

* Add extra check

* Add subresources to applications

* Try to create subresources

* Remove dummy values

* Fix test, upgrade library

* Upgrade k8s library with last version

* Change status Class

* Fix test

* Fix tests

* Fix watcher test

* Fix style

* Upgrade library

* Update fiaas_deploy_daemon/crd/watcher.py

Co-authored-by: Øyvind Ingebrigtsen Øvergaard <oyvind.overgaard@gmail.com>

* Ack some of the review comments

* Remove status initiation

* Add new test cases, improve asserts

* Improve CRD creation

* Fix style issues

* Add configuration flag to update status in app

* Update fiaas_deploy_daemon/crd/status.py

Co-authored-by: Øyvind Ingebrigtsen Øvergaard <oyvind.overgaard@gmail.com>

* Review last pull requests comments

* Fix issues

* Rollback changes in schema

---------

Co-authored-by: Pau Ortega <pau.ortega@adevinta.com>
Co-authored-by: herodes1991 <eloymaillo@MacBook-Pro-3.local>
Co-authored-by: Øyvind Ingebrigtsen Øvergaard <oyvind.overgaard@gmail.com>
Co-authored-by: herodes1991 <eloymaillo@eloymaillo-MacBook-Pro-JGH5.local>
  • Loading branch information
5 people authored Nov 16, 2023
1 parent eea152a commit 9e796d6
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 34 deletions.
2 changes: 1 addition & 1 deletion fiaas_deploy_daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def main():
try:
log.info("fiaas-deploy-daemon starting with configuration {!r}".format(cfg))
if cfg.enable_crd_support:
crd_binding = CustomResourceDefinitionBindings(cfg.use_apiextensionsv1_crd)
crd_binding = CustomResourceDefinitionBindings(cfg.use_apiextensionsv1_crd, cfg.include_status_in_app)
else:
crd_binding = DisabledCustomResourceDefinitionBindings()
binding_specs = [
Expand Down
2 changes: 1 addition & 1 deletion fiaas_deploy_daemon/bootstrap/bootstrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def __init__(self, config, deploy_queue, spec_factory, lifecycle):
from ..crd.status import connect_signals
else:
raise InvalidConfigurationException("Custom Resource Definition support must be enabled when bootstrapping")
connect_signals()
connect_signals(config.include_status_in_app)
signal(DEPLOY_STATUS_CHANGED).connect(self._store_status)

def run(self):
Expand Down
8 changes: 6 additions & 2 deletions fiaas_deploy_daemon/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def _parse_args(self, args):
parser.add_argument(
"--deployment-max-surge",
help="maximum number of extra pods that can be scheduled above the desired "
"number of pods during an update",
"number of pods during an update",
default="25%",
type=_int_or_unicode,
)
Expand All @@ -271,7 +271,7 @@ def _parse_args(self, args):
"--ready-check-timeout-multiplier",
type=int,
help="Multiply default ready check timeout (replicas * initialDelaySeconds) with this "
+ "number of seconds (default: %(default)s)",
+ "number of seconds (default: %(default)s)",
default=10,
)
parser.add_argument(
Expand Down Expand Up @@ -302,6 +302,10 @@ def _parse_args(self, args):
action="store_true",
default=False,
)
parser.add_argument(
"--include-status-in-app", help="Include status subresource in application CRD", default=False,
action="store_true"
)
usage_reporting_parser = parser.add_argument_group("Usage Reporting", USAGE_REPORTING_LONG_HELP)
usage_reporting_parser.add_argument(
"--usage-reporting-cluster-name", help="Name of the cluster where the fiaas-deploy-daemon instance resides"
Expand Down
5 changes: 3 additions & 2 deletions fiaas_deploy_daemon/crd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@


class CustomResourceDefinitionBindings(pinject.BindingSpec):
def __init__(self, use_apiextensionsv1_crd):
def __init__(self, use_apiextensionsv1_crd, include_status_in_app):
self.use_apiextensionsv1_crd = use_apiextensionsv1_crd
self.include_status_in_app = include_status_in_app

def configure(self, bind, require):
require("config")
Expand All @@ -38,7 +39,7 @@ def configure(self, bind, require):
bind("crd_resources_syncer", to_class=CrdResourcesSyncerApiextensionsV1)
else:
bind("crd_resources_syncer", to_class=CrdResourcesSyncerApiextensionsV1Beta1)
connect_signals()
connect_signals(self.include_status_in_app)


class DisabledCustomResourceDefinitionBindings(pinject.BindingSpec):
Expand Down
38 changes: 32 additions & 6 deletions fiaas_deploy_daemon/crd/crd_resources_syncer_apiextensionsv1.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
CustomResourceDefinitionVersion,
CustomResourceValidation,
JSONSchemaProps,
CustomResourceSubresources,
JSONSchemaPropsStatusEnabled,
CustomResourceSubresourceStatusEnabled
)

from ..retry import retry_on_upsert_conflict
Expand All @@ -36,12 +39,13 @@
class CrdResourcesSyncerApiextensionsV1(object):
@staticmethod
@retry_on_upsert_conflict
def _create_or_update(kind, plural, short_names, group, schema_properties):
def _create_or_update(kind, plural, short_names, group, open_apiv3_schema, subresources=None):
name = "%s.%s" % (plural, group)
metadata = ObjectMeta(name=name)
names = CustomResourceDefinitionNames(kind=kind, plural=plural, shortNames=short_names)
schema = CustomResourceValidation(openAPIV3Schema=JSONSchemaProps(type="object", properties=schema_properties))
version_v1 = CustomResourceDefinitionVersion(name="v1", served=True, storage=True, schema=schema)
schema = CustomResourceValidation(openAPIV3Schema=open_apiv3_schema)
version_v1 = CustomResourceDefinitionVersion(name="v1", served=True, storage=True, schema=schema,
subresources=subresources)
spec = CustomResourceDefinitionSpec(
group=group,
names=names,
Expand Down Expand Up @@ -94,20 +98,42 @@ def update_crd_resources(cls):
"status": object_with_unknown_fields,
},
},
},
}
},
"status": {
"type": "object",
"properties": {
"result": {
"type": "string"
},
"observedGeneration": {
"type": "integer"
},
"logs": {
"type": "array",
"items": {
"type": "string"
}
},
"deployment_id": {
"type": "string"
}
}
}
}
application_status_schema_properties = {
"result": {"type": "string"},
"logs": {"type": "array", "items": {"type": "string"}},
}
cls._create_or_update(
"Application", "applications", ("app", "fa"), "fiaas.schibsted.io", application_schema_properties
"Application", "applications", ("app", "fa"), "fiaas.schibsted.io",
JSONSchemaPropsStatusEnabled(type="object", properties=application_schema_properties),
CustomResourceSubresources(status=CustomResourceSubresourceStatusEnabled())
)
cls._create_or_update(
"ApplicationStatus",
"application-statuses",
("status", "appstatus", "fs"),
"fiaas.schibsted.io",
application_status_schema_properties,
JSONSchemaProps(type="object", properties=application_status_schema_properties)
)
51 changes: 46 additions & 5 deletions fiaas_deploy_daemon/crd/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
from k8s.client import NotFound
from k8s.models.common import ObjectMeta, OwnerReference

from .types import FiaasApplicationStatus
from .types import FiaasApplication, FiaasApplicationStatus, FiaasApplicationStatusResult
from ..lifecycle import DEPLOY_STATUS_CHANGED, STATUS_STARTED
from ..log_extras import get_final_logs, get_running_logs
from ..log_extras import get_final_error_logs, get_final_logs, get_running_error_logs, get_running_logs
from ..retry import retry_on_upsert_conflict
from ..tools import merge_dicts

Expand All @@ -35,8 +35,11 @@
LOG = logging.getLogger(__name__)


def connect_signals():
signal(DEPLOY_STATUS_CHANGED).connect(_handle_signal)
def connect_signals(include_status_in_app):
if include_status_in_app:
signal(DEPLOY_STATUS_CHANGED).connect(_handle_signal_with_status)
else:
signal(DEPLOY_STATUS_CHANGED).connect(_handle_signal_without_status)


def now():
Expand All @@ -45,16 +48,49 @@ def now():
return now.isoformat()


def _handle_signal(sender, status, subject):
def _handle_signal_without_status(sender, status, subject):
if status == STATUS_STARTED:
status = "RUNNING"
else:
status = status.upper()

_save_status(status, subject)
_cleanup(subject.app_name, subject.namespace)


def _handle_signal_with_status(sender, status, subject):
if status == STATUS_STARTED:
status = "RUNNING"
else:
status = status.upper()

_save_status_inline(status, subject)
_save_status(status, subject)
_cleanup(subject.app_name, subject.namespace)


@retry_on_upsert_conflict
def _save_status_inline(result, subject):
(uid, app_name, namespace, deployment_id, repository, labels, annotations) = subject

app = FiaasApplication.get(app_name, namespace)
generation = int(app.metadata.generation)
try:
application_deployment_id = app.metadata.labels["fiaas/deployment_id"]
except (AttributeError, KeyError, TypeError):
raise ValueError("The Application {} is missing the 'fiaas/deployment_id' label".format(app_name))
# We only want to get error logs here.
if deployment_id == application_deployment_id:
logs = _get_error_logs(app_name, namespace, deployment_id, result)

LOG.info("Saving inline result %s for %s/%s deployment_id=%s generation %s", result, namespace, app_name, deployment_id, generation)
app.status = FiaasApplicationStatusResult(observedGeneration=generation, result=result, logs=logs,
deployment_id=deployment_id)
app.save_status()
else:
LOG.debug("Skipping saving status for application %s with different deployment_id", app_name)


@retry_on_upsert_conflict
def _save_status(result, subject):
(uid, app_name, namespace, deployment_id, repository, labels, annotations) = subject
Expand Down Expand Up @@ -97,6 +133,11 @@ def _get_logs(app_name, namespace, deployment_id, result):
)


def _get_error_logs(app_name, namespace, deployment_id, result):
return get_running_error_logs(app_name, namespace, deployment_id) if result in [u"RUNNING", u"INITIATED"] else \
get_final_error_logs(app_name, namespace, deployment_id)


def _cleanup(app_name=None, namespace=None):
statuses = FiaasApplicationStatus.find(app_name, namespace)

Expand Down
8 changes: 8 additions & 0 deletions fiaas_deploy_daemon/crd/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class FiaasApplicationSpec(Model):
additional_annotations = Field(AdditionalLabelsOrAnnotations)


class FiaasApplicationStatusResult(Model):
result = RequiredField(six.text_type)
observedGeneration = Field(int, 0) # NOQA
logs = ListField(six.text_type)
deployment_id = Field(six.text_type)


class FiaasApplication(Model):
class Meta:
list_url = "/apis/fiaas.schibsted.io/v1/applications"
Expand All @@ -54,6 +61,7 @@ class Meta:

metadata = Field(ObjectMeta)
spec = Field(FiaasApplicationSpec)
status = Field(FiaasApplicationStatusResult)


class FiaasApplicationStatus(Model):
Expand Down
15 changes: 15 additions & 0 deletions fiaas_deploy_daemon/crd/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ def _handle_watch_event(self, event):
else:
raise ValueError("Unknown WatchEvent type {}".format(event.type))

# When we receive update event on FiaasApplication
# don't deploy if it's a status update
def _skip_status_event(self, application):
app_name = application.spec.application
deployment_id = application.metadata.labels["fiaas/deployment_id"]
generation = int(application.metadata.generation)
observed_generation = int(application.status.observedGeneration)
deployment_id_status = application.status.deployment_id
if observed_generation == generation and deployment_id == deployment_id_status:
LOG.debug("Skipping watch event created from status update %s for app %s", deployment_id, app_name)
return True
return False

def _deploy(self, application):
app_name = application.spec.application
LOG.debug("Deploying %s", app_name)
Expand All @@ -76,6 +89,8 @@ def _deploy(self, application):
set_extras(app_name=app_name, namespace=application.metadata.namespace, deployment_id=deployment_id)
except (AttributeError, KeyError, TypeError):
raise ValueError("The Application {} is missing the 'fiaas/deployment_id' label".format(app_name))
if self._skip_status_event(application):
return
if self._already_deployed(app_name, application.metadata.namespace, deployment_id):
LOG.debug("Have already deployed %s for app %s", deployment_id, app_name)
return
Expand Down
33 changes: 33 additions & 0 deletions fiaas_deploy_daemon/log_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from collections import defaultdict

_LOGS = defaultdict(list)
_ERROR_LOGS = defaultdict(list)
_LOG_EXTRAS = threading.local()
_LOG_FORMAT = (
"[%(asctime)s|%(levelname)7s] %(message)s " "[%(name)s|%(threadName)s|%(extras_namespace)s/%(extras_app_name)s]"
Expand All @@ -35,6 +36,11 @@ def filter(self, record):
return 1


class ExtraErrorFilter(logging.Filter):
def filter(self, record):
return record.levelno >= logging.ERROR


class StatusFormatter(logging.Formatter):
def __init__(self):
super(StatusFormatter, self).__init__(_LOG_FORMAT, None)
Expand Down Expand Up @@ -74,6 +80,17 @@ def emit(self, record):
append_log(record, self.format(record))


class StatusErrorHandler(logging.Handler):
def __init__(self):
super(StatusErrorHandler, self).__init__(logging.ERROR)
self.addFilter(ExtraErrorFilter())
self.addFilter(ExtraFilter())
self.setFormatter(StatusFormatter())

def emit(self, record):
append_error_log(record, self.format(record))


def set_extras(app_spec=None, app_name=None, namespace=None, deployment_id=None):
if app_spec:
app_name = app_spec.name
Expand Down Expand Up @@ -101,3 +118,19 @@ def append_log(record, message):
if hasattr(record, "extras"):
key = (record.extras.get("app_name"), record.extras.get("namespace"), record.extras.get("deployment_id"))
_LOGS[key].append(message)


def get_running_error_logs(app_name, namespace, deployment_id):
key = (app_name, namespace, deployment_id)
return _ERROR_LOGS.get(key, [])


def get_final_error_logs(app_name, namespace, deployment_id):
key = (app_name, namespace, deployment_id)
return _ERROR_LOGS.pop(key, [])


def append_error_log(record, message):
if hasattr(record, "extras"):
key = (record.extras.get("app_name"), record.extras.get("namespace"), record.extras.get("deployment_id"))
_ERROR_LOGS[key].append(message)
3 changes: 2 additions & 1 deletion fiaas_deploy_daemon/logsetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import sys

from fiaas_deploy_daemon.log_extras import StatusHandler
from .log_extras import ExtraFilter
from .log_extras import ExtraFilter, StatusErrorHandler


class FiaasFormatter(logging.Formatter):
Expand Down Expand Up @@ -103,6 +103,7 @@ def init_logging(config):
root.setLevel(logging.DEBUG)
root.addHandler(_create_default_handler(config))
root.addHandler(StatusHandler())
root.addHandler(StatusErrorHandler())
_set_special_levels()


Expand Down
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ def read(filename):
"decorator < 5.0.0", # 5.0.0 and later drops py2 support (transitive dep from pinject)
"six >= 1.12.0",
"dnspython == 1.16.0",
"k8s == 0.22.0",
"k8s == 0.23.4",
"appdirs == 1.4.3",
"requests-toolbelt == 0.9.1",
"requests-toolbelt == 0.10.1",
"backoff == 1.8.0",
"py27hash == 1.1.0",
]
Expand All @@ -52,7 +52,8 @@ def read(filename):
]

DEPLOY_REQ = [
"requests == 2.27.1",
"urllib3 == 1.26.17",
"requests == 2.31.0",
"ipaddress == 1.0.22", # Required by requests for resolving IP address in SSL cert
]

Expand Down
Loading

0 comments on commit 9e796d6

Please sign in to comment.