bootstrapper: collect journald logs on failure (#1618)

This commit is contained in:
miampf 2023-05-30 11:47:36 +00:00 committed by GitHub
parent 60b125cb59
commit 8686c5e7e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 922 additions and 153 deletions

View File

@ -140,9 +140,12 @@ type InitResponse struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Kubeconfig []byte `protobuf:"bytes,1,opt,name=kubeconfig,proto3" json:"kubeconfig,omitempty"`
OwnerId []byte `protobuf:"bytes,2,opt,name=owner_id,json=ownerId,proto3" json:"owner_id,omitempty"`
ClusterId []byte `protobuf:"bytes,3,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
// Types that are assignable to Kind:
//
// *InitResponse_InitSuccess
// *InitResponse_InitFailure
// *InitResponse_Log
Kind isInitResponse_Kind `protobuf_oneof:"kind"`
}
func (x *InitResponse) Reset() {
@ -177,27 +180,213 @@ func (*InitResponse) Descriptor() ([]byte, []int) {
return file_bootstrapper_initproto_init_proto_rawDescGZIP(), []int{1}
}
func (x *InitResponse) GetKubeconfig() []byte {
func (m *InitResponse) GetKind() isInitResponse_Kind {
if m != nil {
return m.Kind
}
return nil
}
func (x *InitResponse) GetInitSuccess() *InitSuccessResponse {
if x, ok := x.GetKind().(*InitResponse_InitSuccess); ok {
return x.InitSuccess
}
return nil
}
func (x *InitResponse) GetInitFailure() *InitFailureResponse {
if x, ok := x.GetKind().(*InitResponse_InitFailure); ok {
return x.InitFailure
}
return nil
}
func (x *InitResponse) GetLog() *LogResponseType {
if x, ok := x.GetKind().(*InitResponse_Log); ok {
return x.Log
}
return nil
}
type isInitResponse_Kind interface {
isInitResponse_Kind()
}
type InitResponse_InitSuccess struct {
InitSuccess *InitSuccessResponse `protobuf:"bytes,1,opt,name=init_success,json=initSuccess,proto3,oneof"`
}
type InitResponse_InitFailure struct {
InitFailure *InitFailureResponse `protobuf:"bytes,2,opt,name=init_failure,json=initFailure,proto3,oneof"`
}
type InitResponse_Log struct {
Log *LogResponseType `protobuf:"bytes,3,opt,name=log,proto3,oneof"`
}
func (*InitResponse_InitSuccess) isInitResponse_Kind() {}
func (*InitResponse_InitFailure) isInitResponse_Kind() {}
func (*InitResponse_Log) isInitResponse_Kind() {}
type InitSuccessResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Kubeconfig []byte `protobuf:"bytes,1,opt,name=kubeconfig,proto3" json:"kubeconfig,omitempty"`
OwnerId []byte `protobuf:"bytes,2,opt,name=owner_id,json=ownerId,proto3" json:"owner_id,omitempty"`
ClusterId []byte `protobuf:"bytes,3,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"`
}
func (x *InitSuccessResponse) Reset() {
*x = InitSuccessResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *InitSuccessResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InitSuccessResponse) ProtoMessage() {}
func (x *InitSuccessResponse) ProtoReflect() protoreflect.Message {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InitSuccessResponse.ProtoReflect.Descriptor instead.
func (*InitSuccessResponse) Descriptor() ([]byte, []int) {
return file_bootstrapper_initproto_init_proto_rawDescGZIP(), []int{2}
}
func (x *InitSuccessResponse) GetKubeconfig() []byte {
if x != nil {
return x.Kubeconfig
}
return nil
}
func (x *InitResponse) GetOwnerId() []byte {
func (x *InitSuccessResponse) GetOwnerId() []byte {
if x != nil {
return x.OwnerId
}
return nil
}
func (x *InitResponse) GetClusterId() []byte {
func (x *InitSuccessResponse) GetClusterId() []byte {
if x != nil {
return x.ClusterId
}
return nil
}
type InitFailureResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
}
func (x *InitFailureResponse) Reset() {
*x = InitFailureResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *InitFailureResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*InitFailureResponse) ProtoMessage() {}
func (x *InitFailureResponse) ProtoReflect() protoreflect.Message {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use InitFailureResponse.ProtoReflect.Descriptor instead.
func (*InitFailureResponse) Descriptor() ([]byte, []int) {
return file_bootstrapper_initproto_init_proto_rawDescGZIP(), []int{3}
}
func (x *InitFailureResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
type LogResponseType struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Log []byte `protobuf:"bytes,1,opt,name=log,proto3" json:"log,omitempty"`
}
func (x *LogResponseType) Reset() {
*x = LogResponseType{}
if protoimpl.UnsafeEnabled {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *LogResponseType) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*LogResponseType) ProtoMessage() {}
func (x *LogResponseType) ProtoReflect() protoreflect.Message {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use LogResponseType.ProtoReflect.Descriptor instead.
func (*LogResponseType) Descriptor() ([]byte, []int) {
return file_bootstrapper_initproto_init_proto_rawDescGZIP(), []int{4}
}
func (x *LogResponseType) GetLog() []byte {
if x != nil {
return x.Log
}
return nil
}
type KubernetesComponent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -212,7 +401,7 @@ type KubernetesComponent struct {
func (x *KubernetesComponent) Reset() {
*x = KubernetesComponent{}
if protoimpl.UnsafeEnabled {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[2]
mi := &file_bootstrapper_initproto_init_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -225,7 +414,7 @@ func (x *KubernetesComponent) String() string {
func (*KubernetesComponent) ProtoMessage() {}
func (x *KubernetesComponent) ProtoReflect() protoreflect.Message {
mi := &file_bootstrapper_initproto_init_proto_msgTypes[2]
mi := &file_bootstrapper_initproto_init_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -238,7 +427,7 @@ func (x *KubernetesComponent) ProtoReflect() protoreflect.Message {
// Deprecated: Use KubernetesComponent.ProtoReflect.Descriptor instead.
func (*KubernetesComponent) Descriptor() ([]byte, []int) {
return file_bootstrapper_initproto_init_proto_rawDescGZIP(), []int{2}
return file_bootstrapper_initproto_init_proto_rawDescGZIP(), []int{5}
}
func (x *KubernetesComponent) GetUrl() string {
@ -300,29 +489,47 @@ var file_bootstrapper_initproto_init_proto_rawDesc = []byte{
0x65, 0x74, 0x18, 0x10, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x69, 0x6e, 0x69, 0x74, 0x53, 0x65,
0x63, 0x72, 0x65, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f,
0x6e, 0x61, 0x6d, 0x65, 0x18, 0x11, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73,
0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x68, 0x0a, 0x0c, 0x49, 0x6e, 0x69, 0x74, 0x52,
0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x6b, 0x75, 0x62, 0x65, 0x63,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x6b, 0x75, 0x62,
0x65, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x77, 0x6e, 0x65, 0x72,
0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6f, 0x77, 0x6e, 0x65, 0x72,
0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64,
0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49,
0x64, 0x22, 0x78, 0x0a, 0x13, 0x4b, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x43,
0x6f, 0x6d, 0x70, 0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61,
0x73, 0x68, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x21,
0x0a, 0x0c, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03,
0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x50, 0x61, 0x74,
0x68, 0x12, 0x18, 0x0a, 0x07, 0x65, 0x78, 0x74, 0x72, 0x61, 0x63, 0x74, 0x18, 0x04, 0x20, 0x01,
0x28, 0x08, 0x52, 0x07, 0x65, 0x78, 0x74, 0x72, 0x61, 0x63, 0x74, 0x32, 0x34, 0x0a, 0x03, 0x41,
0x50, 0x49, 0x12, 0x2d, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x11, 0x2e, 0x69, 0x6e, 0x69,
0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e,
0x69, 0x6e, 0x69, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x65, 0x64, 0x67, 0x65, 0x6c, 0x65, 0x73, 0x73, 0x73, 0x79, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x73,
0x74, 0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x62, 0x6f, 0x6f,
0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x69, 0x74, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xc1, 0x01, 0x0a, 0x0c, 0x49, 0x6e, 0x69, 0x74,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x3e, 0x0a, 0x0c, 0x69, 0x6e, 0x69, 0x74,
0x5f, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19,
0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73,
0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x0b, 0x69, 0x6e, 0x69,
0x74, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x3e, 0x0a, 0x0c, 0x69, 0x6e, 0x69, 0x74,
0x5f, 0x66, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x19,
0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x48, 0x00, 0x52, 0x0b, 0x69, 0x6e, 0x69,
0x74, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x12, 0x29, 0x0a, 0x03, 0x6c, 0x6f, 0x67, 0x18,
0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x4c, 0x6f, 0x67,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x48, 0x00, 0x52, 0x03,
0x6c, 0x6f, 0x67, 0x42, 0x06, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x22, 0x6f, 0x0a, 0x13, 0x49,
0x6e, 0x69, 0x74, 0x53, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x1e, 0x0a, 0x0a, 0x6b, 0x75, 0x62, 0x65, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x6b, 0x75, 0x62, 0x65, 0x63, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x12, 0x19, 0x0a, 0x08, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6f, 0x77, 0x6e, 0x65, 0x72, 0x49, 0x64, 0x12, 0x1d, 0x0a,
0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x22, 0x2b, 0x0a, 0x13,
0x49, 0x6e, 0x69, 0x74, 0x46, 0x61, 0x69, 0x6c, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x23, 0x0a, 0x0f, 0x4c, 0x6f, 0x67,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x03,
0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6c, 0x6f, 0x67, 0x22, 0x78,
0x0a, 0x13, 0x4b, 0x75, 0x62, 0x65, 0x72, 0x6e, 0x65, 0x74, 0x65, 0x73, 0x43, 0x6f, 0x6d, 0x70,
0x6f, 0x6e, 0x65, 0x6e, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x68, 0x61, 0x73, 0x68, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x68, 0x61, 0x73, 0x68, 0x12, 0x21, 0x0a, 0x0c, 0x69,
0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0b, 0x69, 0x6e, 0x73, 0x74, 0x61, 0x6c, 0x6c, 0x50, 0x61, 0x74, 0x68, 0x12, 0x18,
0x0a, 0x07, 0x65, 0x78, 0x74, 0x72, 0x61, 0x63, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52,
0x07, 0x65, 0x78, 0x74, 0x72, 0x61, 0x63, 0x74, 0x32, 0x36, 0x0a, 0x03, 0x41, 0x50, 0x49, 0x12,
0x2f, 0x0a, 0x04, 0x49, 0x6e, 0x69, 0x74, 0x12, 0x11, 0x2e, 0x69, 0x6e, 0x69, 0x74, 0x2e, 0x49,
0x6e, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x69, 0x6e, 0x69,
0x74, 0x2e, 0x49, 0x6e, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01,
0x42, 0x40, 0x5a, 0x3e, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x65,
0x64, 0x67, 0x65, 0x6c, 0x65, 0x73, 0x73, 0x73, 0x79, 0x73, 0x2f, 0x63, 0x6f, 0x6e, 0x73, 0x74,
0x65, 0x6c, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x76, 0x32, 0x2f, 0x62, 0x6f, 0x6f, 0x74,
0x73, 0x74, 0x72, 0x61, 0x70, 0x70, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x69, 0x74, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -337,21 +544,27 @@ func file_bootstrapper_initproto_init_proto_rawDescGZIP() []byte {
return file_bootstrapper_initproto_init_proto_rawDescData
}
var file_bootstrapper_initproto_init_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
var file_bootstrapper_initproto_init_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_bootstrapper_initproto_init_proto_goTypes = []interface{}{
(*InitRequest)(nil), // 0: init.InitRequest
(*InitResponse)(nil), // 1: init.InitResponse
(*KubernetesComponent)(nil), // 2: init.KubernetesComponent
(*InitSuccessResponse)(nil), // 2: init.InitSuccessResponse
(*InitFailureResponse)(nil), // 3: init.InitFailureResponse
(*LogResponseType)(nil), // 4: init.LogResponseType
(*KubernetesComponent)(nil), // 5: init.KubernetesComponent
}
var file_bootstrapper_initproto_init_proto_depIdxs = []int32{
2, // 0: init.InitRequest.kubernetes_components:type_name -> init.KubernetesComponent
0, // 1: init.API.Init:input_type -> init.InitRequest
1, // 2: init.API.Init:output_type -> init.InitResponse
2, // [2:3] is the sub-list for method output_type
1, // [1:2] is the sub-list for method input_type
1, // [1:1] is the sub-list for extension type_name
1, // [1:1] is the sub-list for extension extendee
0, // [0:1] is the sub-list for field type_name
5, // 0: init.InitRequest.kubernetes_components:type_name -> init.KubernetesComponent
2, // 1: init.InitResponse.init_success:type_name -> init.InitSuccessResponse
3, // 2: init.InitResponse.init_failure:type_name -> init.InitFailureResponse
4, // 3: init.InitResponse.log:type_name -> init.LogResponseType
0, // 4: init.API.Init:input_type -> init.InitRequest
1, // 5: init.API.Init:output_type -> init.InitResponse
5, // [5:6] is the sub-list for method output_type
4, // [4:5] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_bootstrapper_initproto_init_proto_init() }
@ -385,6 +598,42 @@ func file_bootstrapper_initproto_init_proto_init() {
}
}
file_bootstrapper_initproto_init_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*InitSuccessResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_bootstrapper_initproto_init_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*InitFailureResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_bootstrapper_initproto_init_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*LogResponseType); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_bootstrapper_initproto_init_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*KubernetesComponent); i {
case 0:
return &v.state
@ -397,13 +646,18 @@ func file_bootstrapper_initproto_init_proto_init() {
}
}
}
file_bootstrapper_initproto_init_proto_msgTypes[1].OneofWrappers = []interface{}{
(*InitResponse_InitSuccess)(nil),
(*InitResponse_InitFailure)(nil),
(*InitResponse_Log)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_bootstrapper_initproto_init_proto_rawDesc,
NumEnums: 0,
NumMessages: 3,
NumMessages: 6,
NumExtensions: 0,
NumServices: 1,
},
@ -429,7 +683,7 @@ const _ = grpc.SupportPackageIsVersion6
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type APIClient interface {
Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*InitResponse, error)
Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (API_InitClient, error)
}
type aPIClient struct {
@ -440,59 +694,86 @@ func NewAPIClient(cc grpc.ClientConnInterface) APIClient {
return &aPIClient{cc}
}
func (c *aPIClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (*InitResponse, error) {
out := new(InitResponse)
err := c.cc.Invoke(ctx, "/init.API/Init", in, out, opts...)
func (c *aPIClient) Init(ctx context.Context, in *InitRequest, opts ...grpc.CallOption) (API_InitClient, error) {
stream, err := c.cc.NewStream(ctx, &_API_serviceDesc.Streams[0], "/init.API/Init", opts...)
if err != nil {
return nil, err
}
return out, nil
x := &aPIInitClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type API_InitClient interface {
Recv() (*InitResponse, error)
grpc.ClientStream
}
type aPIInitClient struct {
grpc.ClientStream
}
func (x *aPIInitClient) Recv() (*InitResponse, error) {
m := new(InitResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// APIServer is the server API for API service.
type APIServer interface {
Init(context.Context, *InitRequest) (*InitResponse, error)
Init(*InitRequest, API_InitServer) error
}
// UnimplementedAPIServer can be embedded to have forward compatible implementations.
type UnimplementedAPIServer struct {
}
func (*UnimplementedAPIServer) Init(context.Context, *InitRequest) (*InitResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Init not implemented")
func (*UnimplementedAPIServer) Init(*InitRequest, API_InitServer) error {
return status.Errorf(codes.Unimplemented, "method Init not implemented")
}
func RegisterAPIServer(s *grpc.Server, srv APIServer) {
s.RegisterService(&_API_serviceDesc, srv)
}
func _API_Init_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InitRequest)
if err := dec(in); err != nil {
return nil, err
func _API_Init_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(InitRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
if interceptor == nil {
return srv.(APIServer).Init(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/init.API/Init",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(APIServer).Init(ctx, req.(*InitRequest))
}
return interceptor(ctx, in, info, handler)
return srv.(APIServer).Init(m, &aPIInitServer{stream})
}
type API_InitServer interface {
Send(*InitResponse) error
grpc.ServerStream
}
type aPIInitServer struct {
grpc.ServerStream
}
func (x *aPIInitServer) Send(m *InitResponse) error {
return x.ServerStream.SendMsg(m)
}
var _API_serviceDesc = grpc.ServiceDesc{
ServiceName: "init.API",
HandlerType: (*APIServer)(nil),
Methods: []grpc.MethodDesc{
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
MethodName: "Init",
Handler: _API_Init_Handler,
StreamName: "Init",
Handler: _API_Init_Handler,
ServerStreams: true,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "bootstrapper/initproto/init.proto",
}

View File

@ -5,7 +5,7 @@ package init;
option go_package = "github.com/edgelesssys/constellation/v2/bootstrapper/initproto";
service API {
rpc Init(InitRequest) returns (InitResponse);
rpc Init(InitRequest) returns (stream InitResponse);
}
message InitRequest {
@ -29,11 +29,27 @@ message InitRequest {
}
message InitResponse {
oneof kind {
InitSuccessResponse init_success = 1;
InitFailureResponse init_failure = 2;
LogResponseType log = 3;
}
}
message InitSuccessResponse {
bytes kubeconfig = 1;
bytes owner_id = 2;
bytes cluster_id = 3;
}
message InitFailureResponse {
string error = 1;
}
message LogResponseType {
bytes log = 1;
}
message KubernetesComponent {
string url = 1;
string hash = 2;

View File

@ -9,6 +9,7 @@ go_library(
deps = [
"//bootstrapper/initproto",
"//bootstrapper/internal/diskencryption",
"//bootstrapper/internal/journald",
"//internal/atls",
"//internal/attestation",
"//internal/crypto",
@ -49,6 +50,7 @@ go_test(
"@com_github_spf13_afero//:afero",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_x_crypto//bcrypt",
"@org_uber_go_goleak//:goleak",
],

View File

@ -18,8 +18,11 @@ If a call from the CLI is received, the InitServer bootstraps the Kubernetes clu
package initserver
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
@ -27,6 +30,7 @@ import (
"github.com/edgelesssys/constellation/v2/bootstrapper/initproto"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/diskencryption"
"github.com/edgelesssys/constellation/v2/bootstrapper/internal/journald"
"github.com/edgelesssys/constellation/v2/internal/atls"
"github.com/edgelesssys/constellation/v2/internal/attestation"
"github.com/edgelesssys/constellation/v2/internal/crypto"
@ -62,8 +66,12 @@ type Server struct {
initSecretHash []byte
kmsURI string
log *logger.Logger
journaldCollector journaldCollection
initproto.UnimplementedAPIServer
}
@ -79,14 +87,20 @@ func New(ctx context.Context, lock locker, kube ClusterInitializer, issuer atls.
return nil, fmt.Errorf("init secret hash is empty")
}
jctlCollector, err := journald.NewCollector(ctx)
if err != nil {
return nil, err
}
server := &Server{
nodeLock: lock,
disk: diskencryption.New(),
initializer: kube,
fileHandler: fh,
issuer: issuer,
log: log,
initSecretHash: initSecretHash,
nodeLock: lock,
disk: diskencryption.New(),
initializer: kube,
fileHandler: fh,
issuer: issuer,
log: log,
initSecretHash: initSecretHash,
journaldCollector: jctlCollector,
}
grpcServer := grpc.NewServer(
@ -113,32 +127,46 @@ func (s *Server) Serve(ip, port string, cleaner cleaner) error {
}
// Init initializes the cluster.
func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initproto.InitResponse, error) {
func (s *Server) Init(req *initproto.InitRequest, stream initproto.API_InitServer) (err error) {
// Acquire lock to prevent shutdown while Init is still running
s.shutdownLock.RLock()
defer s.shutdownLock.RUnlock()
log := s.log.With(zap.String("peer", grpclog.PeerAddrFromContext(ctx)))
log := s.log.With(zap.String("peer", grpclog.PeerAddrFromContext(stream.Context())))
log.Infof("Init called")
s.kmsURI = req.KmsUri
if err := bcrypt.CompareHashAndPassword(s.initSecretHash, req.InitSecret); err != nil {
return nil, status.Errorf(codes.Internal, "invalid init secret %s", err)
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "invalid init secret %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
cloudKms, err := kmssetup.KMS(ctx, req.StorageUri, req.KmsUri)
cloudKms, err := kmssetup.KMS(stream.Context(), req.StorageUri, req.KmsUri)
if err != nil {
return nil, fmt.Errorf("creating kms client: %w", err)
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "creating kms client: %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
// generate values for cluster attestation
measurementSalt, clusterID, err := deriveMeasurementValues(ctx, cloudKms)
measurementSalt, clusterID, err := deriveMeasurementValues(stream.Context(), cloudKms)
if err != nil {
return nil, status.Errorf(codes.Internal, "deriving measurement values: %s", err)
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "deriving measurement values: %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
nodeLockAcquired, err := s.nodeLock.TryLockOnce(clusterID)
if err != nil {
return nil, status.Errorf(codes.Internal, "locking node: %s", err)
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "locking node: %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
if !nodeLockAcquired {
// The join client seems to already have a connection to an
@ -147,7 +175,13 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
//
// The server stops itself after the current call is done.
log.Warnf("Node is already in a join process")
return nil, status.Error(codes.FailedPrecondition, "node is already being activated")
err = status.Error(codes.FailedPrecondition, "node is already being activated")
if e := s.sendLogsWithMessage(stream, err); e != nil {
err = errors.Join(err, e)
}
return err
}
// Stop the join client -> We no longer expect to join an existing cluster,
@ -155,8 +189,11 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
// Any errors following this call will result in a failed node that may not join any cluster.
s.cleaner.Clean()
if err := s.setupDisk(ctx, cloudKms); err != nil {
return nil, status.Errorf(codes.Internal, "setting up disk: %s", err)
if err := s.setupDisk(stream.Context(), cloudKms); err != nil {
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "setting up disk: %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
state := nodestate.NodeState{
@ -164,7 +201,10 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
MeasurementSalt: measurementSalt,
}
if err := state.ToFile(s.fileHandler); err != nil {
return nil, status.Errorf(codes.Internal, "persisting node state: %s", err)
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "persisting node state: %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
clusterName := req.ClusterName
@ -172,7 +212,7 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
clusterName = "constellation"
}
kubeconfig, err := s.initializer.InitCluster(ctx,
kubeconfig, err := s.initializer.InitCluster(stream.Context(),
req.CloudServiceAccountUri,
req.KubernetesVersion,
clusterName,
@ -183,14 +223,67 @@ func (s *Server) Init(ctx context.Context, req *initproto.InitRequest) (*initpro
s.log,
)
if err != nil {
return nil, status.Errorf(codes.Internal, "initializing cluster: %s", err)
if e := s.sendLogsWithMessage(stream, status.Errorf(codes.Internal, "initializing cluster: %s", err)); e != nil {
err = errors.Join(err, e)
}
return err
}
log.Infof("Init succeeded")
return &initproto.InitResponse{
Kubeconfig: kubeconfig,
ClusterId: clusterID,
}, nil
successMessage := &initproto.InitResponse_InitSuccess{
InitSuccess: &initproto.InitSuccessResponse{
Kubeconfig: kubeconfig,
ClusterId: clusterID,
},
}
return stream.Send(&initproto.InitResponse{Kind: successMessage})
}
func (s *Server) sendLogsWithMessage(stream initproto.API_InitServer, message error) error {
// send back the error message
if err := stream.Send(&initproto.InitResponse{
Kind: &initproto.InitResponse_InitFailure{
InitFailure: &initproto.InitFailureResponse{Error: message.Error()},
},
}); err != nil {
return err
}
logPipe, err := s.journaldCollector.Start()
if err != nil {
return status.Errorf(codes.Internal, "failed starting the log collector: %s", err)
}
reader := bufio.NewReader(logPipe)
buffer := make([]byte, 1024)
for {
n, err := io.ReadFull(reader, buffer)
buffer = buffer[:n] // cap the buffer so that we don't have a bunch of nullbytes at the end
if err != nil {
if err == io.EOF {
break
}
if err != io.ErrUnexpectedEOF {
return status.Errorf(codes.Internal, "failed to read from pipe: %s", err)
}
}
err = stream.Send(&initproto.InitResponse{
Kind: &initproto.InitResponse_Log{
Log: &initproto.LogResponseType{
Log: buffer,
},
},
})
if err != nil {
return status.Errorf(codes.Internal, "failed to send chunk: %s", err)
}
}
return nil
}
// Stop stops the initialization server gracefully.
@ -291,3 +384,9 @@ type MetadataAPI interface {
// InitSecretHash returns the initSecretHash of the instance.
InitSecretHash(ctx context.Context) ([]byte, error)
}
// journaldCollection is an interface for collecting journald logs.
type journaldCollection interface {
// Start starts the journald collector and returns a pipe from which the system logs can be read.
Start() (io.ReadCloser, error)
}

View File

@ -7,8 +7,10 @@ SPDX-License-Identifier: AGPL-3.0-only
package initserver
import (
"bytes"
"context"
"errors"
"io"
"net"
"strings"
"sync"
@ -29,6 +31,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/crypto/bcrypt"
"google.golang.org/grpc"
)
func TestMain(m *testing.M) {
@ -99,6 +102,8 @@ func TestInit(t *testing.T) {
disk encryptedDisk
fileHandler file.Handler
req *initproto.InitRequest
stream stubStream
logCollector stubJournaldCollector
initSecretHash []byte
wantErr bool
wantShutdown bool
@ -110,6 +115,8 @@ func TestInit(t *testing.T) {
fileHandler: file.NewHandler(afero.NewMemMapFs()),
initSecretHash: initSecretHash,
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
wantShutdown: true,
},
"node locked": {
@ -118,6 +125,8 @@ func TestInit(t *testing.T) {
disk: &stubDisk{},
fileHandler: file.NewHandler(afero.NewMemMapFs()),
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
initSecretHash: initSecretHash,
wantErr: true,
},
@ -127,6 +136,8 @@ func TestInit(t *testing.T) {
disk: &stubDisk{openErr: someErr},
fileHandler: file.NewHandler(afero.NewMemMapFs()),
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
initSecretHash: initSecretHash,
wantErr: true,
wantShutdown: true,
@ -137,6 +148,8 @@ func TestInit(t *testing.T) {
disk: &stubDisk{uuidErr: someErr},
fileHandler: file.NewHandler(afero.NewMemMapFs()),
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
initSecretHash: initSecretHash,
wantErr: true,
wantShutdown: true,
@ -147,6 +160,8 @@ func TestInit(t *testing.T) {
disk: &stubDisk{updatePassphraseErr: someErr},
fileHandler: file.NewHandler(afero.NewMemMapFs()),
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
initSecretHash: initSecretHash,
wantErr: true,
wantShutdown: true,
@ -157,6 +172,8 @@ func TestInit(t *testing.T) {
disk: &stubDisk{},
fileHandler: file.NewHandler(afero.NewReadOnlyFs(afero.NewMemMapFs())),
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
initSecretHash: initSecretHash,
wantErr: true,
wantShutdown: true,
@ -167,6 +184,8 @@ func TestInit(t *testing.T) {
disk: &stubDisk{},
fileHandler: file.NewHandler(afero.NewMemMapFs()),
req: &initproto.InitRequest{InitSecret: initSecret, KmsUri: masterSecret.EncodeToURI(), StorageUri: uri.NoStoreURI},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
initSecretHash: initSecretHash,
wantErr: true,
wantShutdown: true,
@ -178,6 +197,8 @@ func TestInit(t *testing.T) {
fileHandler: file.NewHandler(afero.NewMemMapFs()),
initSecretHash: initSecretHash,
req: &initproto.InitRequest{InitSecret: []byte("wrongpassword")},
stream: stubStream{},
logCollector: stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte{})}},
wantErr: true,
},
}
@ -188,17 +209,18 @@ func TestInit(t *testing.T) {
serveStopper := newStubServeStopper()
server := &Server{
nodeLock: tc.nodeLock,
initializer: tc.initializer,
disk: tc.disk,
fileHandler: tc.fileHandler,
log: logger.NewTest(t),
grpcServer: serveStopper,
cleaner: &fakeCleaner{serveStopper: serveStopper},
initSecretHash: tc.initSecretHash,
nodeLock: tc.nodeLock,
initializer: tc.initializer,
disk: tc.disk,
fileHandler: tc.fileHandler,
log: logger.NewTest(t),
grpcServer: serveStopper,
cleaner: &fakeCleaner{serveStopper: serveStopper},
initSecretHash: tc.initSecretHash,
journaldCollector: &tc.logCollector,
}
kubeconfig, err := server.Init(context.Background(), tc.req)
err := server.Init(tc.req, &tc.stream)
if tc.wantErr {
assert.Error(err)
@ -214,13 +236,82 @@ func TestInit(t *testing.T) {
return
}
for _, res := range tc.stream.res {
assert.NotNil(res.GetInitSuccess())
}
assert.NoError(err)
assert.NotNil(kubeconfig)
assert.False(server.nodeLock.TryLockOnce(nil)) // lock should be locked
})
}
}
func TestSendLogsWithMessage(t *testing.T) {
someError := errors.New("failed")
testCases := map[string]struct {
logCollector journaldCollection
stream stubStream
failureMessage string
expectedResult string
expectedFailureMessage string
wantErr bool
}{
"success": {
logCollector: &stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte("asdf"))}},
stream: stubStream{},
failureMessage: "fdsa",
expectedResult: "asdf",
expectedFailureMessage: "fdsa",
},
"fail collection": {
logCollector: &stubJournaldCollector{collectErr: someError},
failureMessage: "fdsa",
wantErr: true,
expectedFailureMessage: "fdsa",
},
"fail to send": {
logCollector: &stubJournaldCollector{logPipe: &stubReadCloser{reader: bytes.NewReader([]byte("asdf"))}},
stream: stubStream{sendError: someError},
failureMessage: "fdsa",
wantErr: true,
expectedFailureMessage: "fdsa",
},
"fail to read": {
logCollector: &stubJournaldCollector{logPipe: &stubReadCloser{readErr: someError}},
failureMessage: "fdsa",
wantErr: true,
expectedFailureMessage: "fdsa",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
serverStopper := newStubServeStopper()
server := &Server{
grpcServer: serverStopper,
journaldCollector: tc.logCollector,
}
err := server.sendLogsWithMessage(&tc.stream, errors.New(tc.failureMessage))
if tc.wantErr {
assert.Error(err)
return
}
assert.Equal(tc.stream.res[0].GetInitFailure().GetError(), tc.expectedFailureMessage)
assert.NoError(err)
for _, res := range tc.stream.res[1:] {
assert.Equal(tc.expectedResult, string(res.GetLog().Log))
}
})
}
}
func TestSetupDisk(t *testing.T) {
testCases := map[string]struct {
uuid string
@ -372,3 +463,48 @@ type stubMetadata struct {
func (m *stubMetadata) InitSecretHash(context.Context) ([]byte, error) {
return m.initSecretHashVal, m.initSecretHashErr
}
type stubStream struct {
res []*initproto.InitResponse
sendError error
grpc.ServerStream
}
func (s *stubStream) Send(m *initproto.InitResponse) error {
if s.sendError == nil {
// we append here since we don't receive anything
// if that if doesn't trigger
s.res = append(s.res, m)
}
return s.sendError
}
func (s *stubStream) Context() context.Context {
return context.Background()
}
type stubJournaldCollector struct {
logPipe io.ReadCloser
collectErr error
}
func (s *stubJournaldCollector) Start() (io.ReadCloser, error) {
return s.logPipe, s.collectErr
}
type stubReadCloser struct {
reader io.Reader
readErr error
closeErr error
}
func (s *stubReadCloser) Read(p []byte) (n int, err error) {
if s.readErr != nil {
return 0, s.readErr
}
return s.reader.Read(p)
}
func (s *stubReadCloser) Close() error {
return s.closeErr
}

View File

@ -17,17 +17,6 @@ import (
"github.com/stretchr/testify/require"
)
func (s *stubReadCloser) Read(p []byte) (n int, err error) {
if s.readErr != nil {
return 0, s.readErr
}
return s.reader.Read(p)
}
func (s *stubReadCloser) Close() error {
return s.closeErr
}
func TestPipe(t *testing.T) {
someError := errors.New("failed")
@ -135,3 +124,14 @@ type stubReadCloser struct {
readErr error
closeErr error
}
func (s *stubReadCloser) Read(p []byte) (n int, err error) {
if s.readErr != nil {
return 0, s.readErr
}
return s.reader.Read(p)
}
func (s *stubReadCloser) Close() error {
return s.closeErr
}

View File

@ -68,9 +68,11 @@ func NewInitCmd() *cobra.Command {
}
type initCmd struct {
log debugLog
merger configMerger
spinner spinnerInterf
log debugLog
merger configMerger
spinner spinnerInterf
masterSecret uri.MasterSecret
fh *file.Handler
}
// runInitialize runs the initialize command.
@ -94,7 +96,7 @@ func runInitialize(cmd *cobra.Command, _ []string) error {
ctx, cancel := context.WithTimeout(cmd.Context(), time.Hour)
defer cancel()
cmd.SetContext(ctx)
i := &initCmd{log: log, spinner: spinner, merger: &kubeconfigMerger{log: log}}
i := &initCmd{log: log, spinner: spinner, merger: &kubeconfigMerger{log: log}, fh: &fileHandler}
return i.initialize(cmd, newDialer, fileHandler, license.NewClient())
}
@ -153,6 +155,7 @@ func (i *initCmd) initialize(cmd *cobra.Command, newDialer func(validator atls.V
}
i.log.Debugf("Successfully marshaled service account URI")
masterSecret, err := i.readOrGenerateMasterSecret(cmd.OutOrStdout(), fileHandler, flags.masterSecretPath)
i.masterSecret = masterSecret
if err != nil {
return fmt.Errorf("parsing or generating master secret from file %s: %w", flags.masterSecretPath, err)
}
@ -183,11 +186,13 @@ func (i *initCmd) initialize(cmd *cobra.Command, newDialer func(validator atls.V
i.log.Debugf("Sending initialization request")
resp, err := i.initCall(cmd.Context(), newDialer(validator), idFile.IP, req)
i.spinner.Stop()
if err != nil {
var nonRetriable *nonRetriableError
if errors.As(err, &nonRetriable) {
cmd.PrintErrln("Cluster initialization failed. This error is not recoverable.")
cmd.PrintErrln("Terminate your cluster and try again.")
cmd.PrintErrf("The cluster logs were saved to %q\n", constants.ErrorLog)
}
return err
}
@ -198,13 +203,14 @@ func (i *initCmd) initialize(cmd *cobra.Command, newDialer func(validator atls.V
return i.writeOutput(idFile, resp, flags.mergeConfigs, cmd.OutOrStdout(), fileHandler)
}
func (i *initCmd) initCall(ctx context.Context, dialer grpcDialer, ip string, req *initproto.InitRequest) (*initproto.InitResponse, error) {
func (i *initCmd) initCall(ctx context.Context, dialer grpcDialer, ip string, req *initproto.InitRequest) (*initproto.InitSuccessResponse, error) {
doer := &initDoer{
dialer: dialer,
endpoint: net.JoinHostPort(ip, strconv.Itoa(constants.BootstrapperPort)),
req: req,
log: i.log,
spinner: i.spinner,
fh: file.NewHandler(afero.NewOsFs()),
}
// Create a wrapper function that allows logging any returned error from the retrier before checking if it's the expected retriable one.
@ -226,10 +232,11 @@ type initDoer struct {
dialer grpcDialer
endpoint string
req *initproto.InitRequest
resp *initproto.InitResponse
resp *initproto.InitSuccessResponse
log debugLog
spinner spinnerInterf
connectedOnce bool
fh file.Handler
}
func (d *initDoer) Do(ctx context.Context) error {
@ -241,7 +248,7 @@ func (d *initDoer) Do(ctx context.Context) error {
conn, err := d.dialer.Dial(ctx, d.endpoint)
if err != nil {
d.log.Debugf("Dialing init server failed: %w. Retrying...", err)
d.log.Debugf("Dialing init server failed: %s. Retrying...", err)
return fmt.Errorf("dialing init server: %w", err)
}
defer conn.Close()
@ -259,7 +266,54 @@ func (d *initDoer) Do(ctx context.Context) error {
if err != nil {
return &nonRetriableError{fmt.Errorf("init call: %w", err)}
}
d.resp = resp
res, err := resp.Recv() // get first response, either success or failure
if err != nil {
if e := d.getLogs(resp); e != nil {
d.log.Debugf("Failed to collect logs: %s", e)
}
return &nonRetriableError{err}
}
switch res.Kind.(type) {
case *initproto.InitResponse_InitFailure:
if e := d.getLogs(resp); e != nil {
d.log.Debugf("Failed to get logs from cluster: %s", e)
}
return &nonRetriableError{errors.New(res.GetInitFailure().GetError())}
case *initproto.InitResponse_InitSuccess:
d.resp = res.GetInitSuccess()
case nil:
d.log.Debugf("Cluster returned nil response type")
return &nonRetriableError{errors.New("empty response from cluster")}
default:
d.log.Debugf("Cluster returned unknown response type")
return &nonRetriableError{errors.New("unknown response from cluster")}
}
return nil
}
func (d *initDoer) getLogs(resp initproto.API_InitClient) error {
d.log.Debugf("Attempting to collect cluster logs")
for {
res, err := resp.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log := res.GetLog().GetLog()
if log == nil {
return errors.New("sent empty logs")
}
if err := d.fh.Write(constants.ErrorLog, log, file.OptAppend); err != nil {
return err
}
}
return nil
}
@ -284,13 +338,13 @@ func (d *initDoer) handleGRPCStateChanges(ctx context.Context, wg *sync.WaitGrou
}
func (i *initCmd) writeOutput(
idFile clusterid.File, resp *initproto.InitResponse, mergeConfig bool, wr io.Writer, fileHandler file.Handler,
idFile clusterid.File, initResp *initproto.InitSuccessResponse, mergeConfig bool, wr io.Writer, fileHandler file.Handler,
) error {
fmt.Fprint(wr, "Your Constellation cluster was successfully initialized.\n\n")
ownerID := hex.EncodeToString(resp.OwnerId)
ownerID := hex.EncodeToString(initResp.GetOwnerId())
// i.log.Debugf("Owner id is %s", ownerID)
clusterID := hex.EncodeToString(resp.ClusterId)
clusterID := hex.EncodeToString(initResp.GetClusterId())
tw := tabwriter.NewWriter(wr, 0, 0, 2, ' ', 0)
// writeRow(tw, "Constellation cluster's owner identifier", ownerID)
@ -299,7 +353,7 @@ func (i *initCmd) writeOutput(
tw.Flush()
fmt.Fprintln(wr)
if err := fileHandler.Write(constants.AdminConfFilename, resp.Kubeconfig, file.OptNone); err != nil {
if err := fileHandler.Write(constants.AdminConfFilename, initResp.GetKubeconfig(), file.OptNone); err != nil {
return fmt.Errorf("writing kubeconfig: %w", err)
}
i.log.Debugf("Kubeconfig written to %s", constants.AdminConfFilename)

View File

@ -12,6 +12,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"io"
"net"
"strconv"
"strings"
@ -54,7 +55,7 @@ func TestInitialize(t *testing.T) {
gcpServiceAccKey := &gcpshared.ServiceAccountKey{
Type: "service_account",
}
testInitResp := &initproto.InitResponse{
testInitResp := &initproto.InitSuccessResponse{
Kubeconfig: []byte("kubeconfig"),
OwnerId: []byte("ownerID"),
ClusterId: []byte("clusterID"),
@ -77,17 +78,17 @@ func TestInitialize(t *testing.T) {
idFile: &clusterid.File{IP: "192.0.2.1"},
configMutator: func(c *config.Config) { c.Provider.GCP.ServiceAccountKeyPath = serviceAccPath },
serviceAccKey: gcpServiceAccKey,
initServerAPI: &stubInitServer{initResp: testInitResp},
initServerAPI: &stubInitServer{res: &initproto.InitResponse{Kind: &initproto.InitResponse_InitSuccess{InitSuccess: testInitResp}}},
},
"initialize some azure instances": {
provider: cloudprovider.Azure,
idFile: &clusterid.File{IP: "192.0.2.1"},
initServerAPI: &stubInitServer{initResp: testInitResp},
initServerAPI: &stubInitServer{res: &initproto.InitResponse{Kind: &initproto.InitResponse_InitSuccess{InitSuccess: testInitResp}}},
},
"initialize some qemu instances": {
provider: cloudprovider.QEMU,
idFile: &clusterid.File{IP: "192.0.2.1"},
initServerAPI: &stubInitServer{initResp: testInitResp},
initServerAPI: &stubInitServer{res: &initproto.InitResponse{Kind: &initproto.InitResponse_InitSuccess{InitSuccess: testInitResp}}},
},
"non retriable error": {
provider: cloudprovider.QEMU,
@ -119,7 +120,7 @@ func TestInitialize(t *testing.T) {
"k8s version without v works": {
provider: cloudprovider.Azure,
idFile: &clusterid.File{IP: "192.0.2.1"},
initServerAPI: &stubInitServer{initResp: testInitResp},
initServerAPI: &stubInitServer{res: &initproto.InitResponse{Kind: &initproto.InitResponse_InitSuccess{InitSuccess: testInitResp}}},
configMutator: func(c *config.Config) { c.KubernetesVersion = strings.TrimPrefix(string(versions.Default), "v") },
},
}
@ -200,18 +201,79 @@ func TestInitialize(t *testing.T) {
}
}
func TestGetLogs(t *testing.T) {
someErr := errors.New("failed")
testCases := map[string]struct {
resp initproto.API_InitClient
fh file.Handler
wantedOutput []byte
wantErr bool
}{
"success": {
resp: stubInitClient{res: bytes.NewReader([]byte("asdf"))},
fh: file.NewHandler(afero.NewMemMapFs()),
wantedOutput: []byte("asdf"),
},
"receive error": {
resp: stubInitClient{err: someErr},
fh: file.NewHandler(afero.NewMemMapFs()),
wantErr: true,
},
"nil log": {
resp: stubInitClient{res: bytes.NewReader([]byte{1}), setResNil: true},
fh: file.NewHandler(afero.NewMemMapFs()),
wantErr: true,
},
"failed write": {
resp: stubInitClient{res: bytes.NewReader([]byte("asdf"))},
fh: file.NewHandler(afero.NewReadOnlyFs(afero.NewMemMapFs())),
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
doer := initDoer{
fh: tc.fh,
log: logger.NewTest(t),
}
err := doer.getLogs(tc.resp)
if tc.wantErr {
assert.Error(err)
}
text, err := tc.fh.Read(constants.ErrorLog)
if tc.wantedOutput == nil {
assert.Error(err)
}
assert.Equal(tc.wantedOutput, text)
})
}
}
func TestWriteOutput(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
resp := &initproto.InitResponse{
OwnerId: []byte("ownerID"),
ClusterId: []byte("clusterID"),
Kubeconfig: []byte("kubeconfig"),
Kind: &initproto.InitResponse_InitSuccess{
InitSuccess: &initproto.InitSuccessResponse{
OwnerId: []byte("ownerID"),
ClusterId: []byte("clusterID"),
Kubeconfig: []byte("kubeconfig"),
},
},
}
ownerID := hex.EncodeToString(resp.OwnerId)
clusterID := hex.EncodeToString(resp.ClusterId)
ownerID := hex.EncodeToString(resp.GetInitSuccess().GetOwnerId())
clusterID := hex.EncodeToString(resp.GetInitSuccess().GetClusterId())
expectedIDFile := clusterid.File{
ClusterID: clusterID,
@ -232,7 +294,7 @@ func TestWriteOutput(t *testing.T) {
log: logger.NewTest(t),
merger: &stubMerger{},
}
err := i.writeOutput(idFile, resp, false, &out, fileHandler)
err := i.writeOutput(idFile, resp.GetInitSuccess(), false, &out, fileHandler)
require.NoError(err)
// assert.Contains(out.String(), ownerID)
assert.Contains(out.String(), clusterID)
@ -241,7 +303,7 @@ func TestWriteOutput(t *testing.T) {
afs := afero.Afero{Fs: testFs}
adminConf, err := afs.ReadFile(constants.AdminConfFilename)
assert.NoError(err)
assert.Equal(string(resp.Kubeconfig), string(adminConf))
assert.Equal(string(resp.GetInitSuccess().GetKubeconfig()), string(adminConf))
idsFile, err := afs.ReadFile(constants.ClusterIDsFileName)
assert.NoError(err)
@ -253,7 +315,7 @@ func TestWriteOutput(t *testing.T) {
// test config merging
out.Reset()
require.NoError(afs.Remove(constants.AdminConfFilename))
err = i.writeOutput(idFile, resp, true, &out, fileHandler)
err = i.writeOutput(idFile, resp.GetInitSuccess(), true, &out, fileHandler)
require.NoError(err)
// assert.Contains(out.String(), ownerID)
assert.Contains(out.String(), clusterID)
@ -265,7 +327,7 @@ func TestWriteOutput(t *testing.T) {
i.merger = &stubMerger{envVar: "/some/path/to/kubeconfig"}
out.Reset()
require.NoError(afs.Remove(constants.AdminConfFilename))
err = i.writeOutput(idFile, resp, true, &out, fileHandler)
err = i.writeOutput(idFile, resp.GetInitSuccess(), true, &out, fileHandler)
require.NoError(err)
// assert.Contains(out.String(), ownerID)
assert.Contains(out.String(), clusterID)
@ -389,10 +451,14 @@ func TestAttestation(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
initServerAPI := &stubInitServer{initResp: &initproto.InitResponse{
Kubeconfig: []byte("kubeconfig"),
OwnerId: []byte("ownerID"),
ClusterId: []byte("clusterID"),
initServerAPI := &stubInitServer{res: &initproto.InitResponse{
Kind: &initproto.InitResponse_InitSuccess{
InitSuccess: &initproto.InitSuccessResponse{
Kubeconfig: []byte("kubeconfig"),
OwnerId: []byte("ownerID"),
ClusterId: []byte("clusterID"),
},
},
}}
existingIDFile := &clusterid.File{IP: "192.0.2.4", CloudProvider: cloudprovider.QEMU}
@ -499,14 +565,15 @@ func (i *testIssuer) Issue(_ context.Context, userData []byte, _ []byte) ([]byte
}
type stubInitServer struct {
initResp *initproto.InitResponse
initErr error
res *initproto.InitResponse
initErr error
initproto.UnimplementedAPIServer
}
func (s *stubInitServer) Init(_ context.Context, _ *initproto.InitRequest) (*initproto.InitResponse, error) {
return s.initResp, s.initErr
func (s *stubInitServer) Init(_ *initproto.InitRequest, stream initproto.API_InitServer) error {
_ = stream.Send(s.res)
return s.initErr
}
type stubMerger struct {
@ -565,3 +632,39 @@ func (c *stubLicenseClient) QuotaCheck(_ context.Context, _ license.QuotaCheckRe
Quota: 25,
}, nil
}
type stubInitClient struct {
res io.Reader
err error
setResNil bool
grpc.ClientStream
}
func (c stubInitClient) Recv() (*initproto.InitResponse, error) {
if c.err != nil {
return &initproto.InitResponse{}, c.err
}
text := make([]byte, 1024)
n, err := c.res.Read(text)
text = text[:n]
res := &initproto.InitResponse{
Kind: &initproto.InitResponse_Log{
Log: &initproto.LogResponseType{
Log: text,
},
},
}
if c.setResNil {
res = &initproto.InitResponse{
Kind: &initproto.InitResponse_Log{
Log: &initproto.LogResponseType{
Log: nil,
},
},
}
}
return res, err
}

View File

@ -83,6 +83,8 @@ const (
TerraformIAMWorkingDir = "constellation-iam-terraform"
// GCPServiceAccountKeyFile is the file name for the GCP service account key file.
GCPServiceAccountKeyFile = "gcpServiceAccountKey.json"
// ErrorLog file which contains server errors during init.
ErrorLog = "constellation-cluster.log"
// ControlPlaneAdminConfFilename filepath to control plane kubernetes admin config.
ControlPlaneAdminConfFilename = "/etc/kubernetes/admin.conf"
// KubectlPath path to kubectl binary.

View File

@ -49,6 +49,8 @@ const (
optOverwrite
// OptMkdirAll creates the path to the file.
optMkdirAll
// OptAppend appends to the file.
optAppend
)
var (
@ -58,6 +60,8 @@ var (
OptOverwrite = Option{optOverwrite}
// OptMkdirAll creates the path to the file.
OptMkdirAll = Option{optMkdirAll}
// OptAppend appends to the file.
OptAppend = Option{optAppend}
)
// Handler handles file interaction.
@ -92,6 +96,9 @@ func (h *Handler) Write(name string, data []byte, options ...Option) error {
if hasOption(options, OptOverwrite) {
flags = os.O_WRONLY | os.O_CREATE | os.O_TRUNC
}
if hasOption(options, OptAppend) {
flags = os.O_WRONLY | os.O_CREATE | os.O_APPEND
}
file, err := h.fs.OpenFile(name, flags, 0o600)
if err != nil {
return err

View File

@ -24,6 +24,74 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
func TestWrite(t *testing.T) {
testCases := map[string]struct {
fs afero.Fs
setupFs func(af afero.Afero) error
name string
content string
expectedContent string
options Option
wantErr bool
wantAppend bool
}{
"successful write": {
fs: afero.NewMemMapFs(),
content: "asdf",
expectedContent: "asdf",
name: "somedir/somefile",
},
"successful overwrite": {
fs: afero.NewMemMapFs(),
setupFs: func(af afero.Afero) error { return af.WriteFile("somedir/somefile", []byte{}, 0o644) },
content: "asdf",
expectedContent: "asdf",
name: "somedir/somefile",
options: OptOverwrite,
},
"successful append": {
fs: afero.NewMemMapFs(),
setupFs: func(af afero.Afero) error { return af.WriteFile("somedir/somefile", []byte("fdsa"), 0o644) },
content: "asdf",
expectedContent: "fdsaasdf",
name: "somedir/somefile",
options: OptAppend,
},
"read only fs": {
fs: afero.NewReadOnlyFs(afero.NewMemMapFs()),
name: "somedir/somefile",
wantErr: true,
},
"file already exists": {
fs: afero.NewMemMapFs(),
setupFs: func(af afero.Afero) error { return af.WriteFile("somedir/somefile", []byte{}, 0o644) },
name: "somedir/somefile",
wantErr: true,
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
handler := NewHandler(tc.fs)
if tc.setupFs != nil {
require.NoError(tc.setupFs(afero.Afero{Fs: tc.fs}))
}
if tc.wantErr {
assert.Error(handler.Write(tc.name, []byte(tc.content), tc.options))
} else {
assert.NoError(handler.Write(tc.name, []byte(tc.content), tc.options))
content, err := handler.Read(tc.name)
require.NoError(err)
assert.Equal(tc.expectedContent, string(content))
}
})
}
}
func TestReadJSON(t *testing.T) {
type testContent struct {
First string

View File

@ -121,6 +121,7 @@ type fakeAPI struct {
initproto.UnimplementedAPIServer
}
func (f *fakeAPI) Init(_ context.Context, _ *initproto.InitRequest) (*initproto.InitResponse, error) {
return &initproto.InitResponse{}, nil
func (f *fakeAPI) Init(_ *initproto.InitRequest, stream initproto.API_InitServer) error {
_ = stream.Send(&initproto.InitResponse{})
return nil
}