From 79bf68fb69372320bff3dd01819dd1204850e65f Mon Sep 17 00:00:00 2001 From: "a.ugodnikov" Date: Tue, 30 Jan 2024 12:48:40 +0300 Subject: [PATCH 1/4] feat: add name in stats --- reader.go | 12 +++++++----- writer.go | 10 +++++++--- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/reader.go b/reader.go index 39bcb49..db85a00 100644 --- a/reader.go +++ b/reader.go @@ -75,7 +75,8 @@ type ReaderConfig struct { } type ConsumeConfig struct { - Limit int64 `json:"limit"` + Limit int64 `json:"limit"` + Name string `json:"name"` } type Duration struct { @@ -331,7 +332,7 @@ func (k *Kafka) consume( msg, err := reader.ReadMessage(ctxWithTimeout) cancel() if errors.Is(err, io.EOF) { - k.reportReaderStats(reader.Stats()) + k.reportReaderStats(reader.Stats(), consumeConfig.Name) err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil) logger.WithField("error", err).Info(err) @@ -339,7 +340,7 @@ func (k *Kafka) consume( } if err != nil { - k.reportReaderStats(reader.Stats()) + k.reportReaderStats(reader.Stats(), consumeConfig.Name) err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil) logger.WithField("error", err).Error(err) @@ -376,13 +377,13 @@ func (k *Kafka) consume( messages = append(messages, message) } - k.reportReaderStats(reader.Stats()) + k.reportReaderStats(reader.Stats(), consumeConfig.Name) return messages } // reportReaderStats reports the reader stats // nolint:funlen -func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) { +func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats, name string) { state := k.vu.State() if state == nil { logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext) @@ -400,6 +401,7 @@ func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats) { sampleTags := ctm.Tags.With("topic", currentStats.Topic) sampleTags = sampleTags.With("clientid", currentStats.ClientID) sampleTags = sampleTags.With("partition", currentStats.Partition) + sampleTags = sampleTags.With("name", name) now := time.Now() metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{ diff --git a/writer.go b/writer.go index b6cbb15..cd4e995 100644 --- a/writer.go +++ b/writer.go @@ -62,6 +62,7 @@ type Message struct { Key []byte `json:"key"` Value []byte `json:"value"` Headers map[string]interface{} `json:"headers"` + Name string `json:"name"` // If not set at the creation, Time will be automatically set when // writing the message. @@ -207,7 +208,10 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { } kafkaMessages := make([]kafkago.Message, len(produceConfig.Messages)) + kafkaMessagesNames := make([]string, len(produceConfig.Messages)) for index, message := range produceConfig.Messages { + kafkaMessagesNames[index] = message.Name + kafkaMessages[index] = kafkago.Message{ Offset: message.Offset, } @@ -247,7 +251,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { originalErr := writer.WriteMessages(k.vu.Context(), kafkaMessages...) - k.reportWriterStats(writer.Stats()) + k.reportWriterStats(writer.Stats(), kafkaMessagesNames) if originalErr != nil { err := NewXk6KafkaError(writerError, "Error writing messages.", originalErr) @@ -258,7 +262,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { // reportWriterStats reports the writer stats to the state. // nolint: funlen -func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats) { +func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats, names []string) { state := k.vu.State() if state == nil { logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext) @@ -273,7 +277,7 @@ func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats) { } ctm := k.vu.State().Tags.GetCurrentValues() - sampleTags := ctm.Tags.With("topic", currentStats.Topic) + sampleTags := ctm.Tags.With("topic", currentStats.Topic).With("name", names[1]) now := time.Now() From c8fed985b2d520c72a42b0a75b9ef69a4b307fbe Mon Sep 17 00:00:00 2001 From: "a.ugodnikov" Date: Tue, 30 Jan 2024 12:53:31 +0300 Subject: [PATCH 2/4] feat: add name in stats --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index e52b9d1..27fcc0b 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/mostafa/xk6-kafka +module github.com/daylikon/xk6-kafka go 1.21 From a46694bcd4e2c4c0e221ad5b70fb3e33ee8a59d6 Mon Sep 17 00:00:00 2001 From: "a.ugodnikov" Date: Wed, 31 Jan 2024 13:06:22 +0300 Subject: [PATCH 3/4] chore: refactor --- reader.go | 11 ++++++----- writer.go | 11 +++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/reader.go b/reader.go index db85a00..83410eb 100644 --- a/reader.go +++ b/reader.go @@ -324,6 +324,7 @@ func (k *Kafka) consume( } messages := make([]map[string]interface{}, 0) + messagesName := consumeConfig.Name maxWait := reader.Config().MaxWait @@ -332,7 +333,7 @@ func (k *Kafka) consume( msg, err := reader.ReadMessage(ctxWithTimeout) cancel() if errors.Is(err, io.EOF) { - k.reportReaderStats(reader.Stats(), consumeConfig.Name) + k.reportReaderStats(reader.Stats(), messagesName) err = NewXk6KafkaError(noMoreMessages, "No more messages.", nil) logger.WithField("error", err).Info(err) @@ -340,7 +341,7 @@ func (k *Kafka) consume( } if err != nil { - k.reportReaderStats(reader.Stats(), consumeConfig.Name) + k.reportReaderStats(reader.Stats(), messagesName) err = NewXk6KafkaError(failedReadMessage, "Unable to read messages.", nil) logger.WithField("error", err).Error(err) @@ -377,13 +378,13 @@ func (k *Kafka) consume( messages = append(messages, message) } - k.reportReaderStats(reader.Stats(), consumeConfig.Name) + k.reportReaderStats(reader.Stats(), messagesName) return messages } // reportReaderStats reports the reader stats // nolint:funlen -func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats, name string) { +func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats, messagesName string) { state := k.vu.State() if state == nil { logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext) @@ -401,7 +402,7 @@ func (k *Kafka) reportReaderStats(currentStats kafkago.ReaderStats, name string) sampleTags := ctm.Tags.With("topic", currentStats.Topic) sampleTags = sampleTags.With("clientid", currentStats.ClientID) sampleTags = sampleTags.With("partition", currentStats.Partition) - sampleTags = sampleTags.With("name", name) + sampleTags = sampleTags.With("name", messagesName) now := time.Now() metrics.PushIfNotDone(ctx, state.Samples, metrics.ConnectedSamples{ diff --git a/writer.go b/writer.go index cd4e995..4845672 100644 --- a/writer.go +++ b/writer.go @@ -208,10 +208,8 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { } kafkaMessages := make([]kafkago.Message, len(produceConfig.Messages)) - kafkaMessagesNames := make([]string, len(produceConfig.Messages)) + kafkaMessagesName := produceConfig.Messages[0].Name for index, message := range produceConfig.Messages { - kafkaMessagesNames[index] = message.Name - kafkaMessages[index] = kafkago.Message{ Offset: message.Offset, } @@ -251,7 +249,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { originalErr := writer.WriteMessages(k.vu.Context(), kafkaMessages...) - k.reportWriterStats(writer.Stats(), kafkaMessagesNames) + k.reportWriterStats(writer.Stats(), kafkaMessagesName) if originalErr != nil { err := NewXk6KafkaError(writerError, "Error writing messages.", originalErr) @@ -262,7 +260,7 @@ func (k *Kafka) produce(writer *kafkago.Writer, produceConfig *ProduceConfig) { // reportWriterStats reports the writer stats to the state. // nolint: funlen -func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats, names []string) { +func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats, messagesName string) { state := k.vu.State() if state == nil { logger.WithField("error", ErrForbiddenInInitContext).Error(ErrForbiddenInInitContext) @@ -277,7 +275,8 @@ func (k *Kafka) reportWriterStats(currentStats kafkago.WriterStats, names []stri } ctm := k.vu.State().Tags.GetCurrentValues() - sampleTags := ctm.Tags.With("topic", currentStats.Topic).With("name", names[1]) + sampleTags := ctm.Tags.With("topic", currentStats.Topic) + sampleTags = sampleTags.With("name", messagesName) now := time.Now() From fb4964f63d35b5c8473fe16a55bbbc5d8a0bc06b Mon Sep 17 00:00:00 2001 From: "a.ugodnikov" Date: Wed, 31 Jan 2024 13:21:58 +0300 Subject: [PATCH 4/4] fix: change go.mod for PR --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 27fcc0b..e52b9d1 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/daylikon/xk6-kafka +module github.com/mostafa/xk6-kafka go 1.21