Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update Zeebe to 8.2.0-alpha4 #266

Merged
merged 4 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
*/
package org.camunda.community.eze

import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry
import io.camunda.zeebe.exporter.api.Exporter
import io.camunda.zeebe.logstreams.log.LogStreamReader
import io.camunda.zeebe.protocol.impl.record.CopiedRecord
Expand All @@ -16,6 +15,7 @@ import io.camunda.zeebe.protocol.record.Record
import io.camunda.zeebe.scheduler.ActorScheduler
import io.camunda.zeebe.scheduler.clock.ActorClock
import io.camunda.zeebe.scheduler.clock.ControlledActorClock
import io.camunda.zeebe.stream.impl.TypedEventRegistry
import org.camunda.community.eze.engine.ExporterRunner
import org.camunda.community.eze.engine.EzeLogStreamFactory
import org.camunda.community.eze.engine.EzeStreamProcessorFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package org.camunda.community.eze.db

import io.camunda.zeebe.db.*
import io.camunda.zeebe.db.impl.DbNil
import io.camunda.zeebe.engine.state.ZbColumnFamilies
import java.io.File
import java.util.*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ package org.camunda.community.eze.db

import io.camunda.zeebe.db.ZeebeDb
import io.camunda.zeebe.db.ZeebeDbFactory
import io.camunda.zeebe.engine.state.ZbColumnFamilies
import java.io.File

class EzeDbFactory<ColumnFamilyType : Enum<ColumnFamilyType>> : ZeebeDbFactory<ColumnFamilyType> {

companion object {
fun <ColumnFamilyType : Enum<ColumnFamilyType>> newFactory() : ZeebeDbFactory<ColumnFamilyType>{
fun <ColumnFamilyType : Enum<ColumnFamilyType>> newFactory(): ZeebeDbFactory<ColumnFamilyType> {
return EzeDbFactory()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import io.camunda.zeebe.protocol.record.Record
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.util.*
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

Expand Down Expand Up @@ -97,6 +98,14 @@ class ExporterRunner(
// currently, not needed
}

override fun updateLastExportedRecordPosition(position: Long, metadata: ByteArray?) {
// currently, not needed
}

override fun readMetadata(): Optional<ByteArray> {
return Optional.empty()
}

override fun scheduleCancellableTask(delay: Duration, task: Runnable): ScheduledTask {
val future =
executor.scheduleWithFixedDelay(task, 0, delay.toMillis(), TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ package org.camunda.community.eze.engine
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter
import io.camunda.zeebe.logstreams.log.LogStream
import io.camunda.zeebe.logstreams.log.LogStreamReader
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter
import io.camunda.zeebe.logstreams.log.LogStreamWriter

class EzeLogStream(private val logStream: LogStream) {

fun createWriter(): LogStreamRecordWriter {
return logStream.newLogStreamRecordWriter().join()
fun createWriter(): LogStreamWriter {
return logStream.newLogStreamWriter().join()
}

fun createReader(): LogStreamReader {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ package org.camunda.community.eze.engine

import io.camunda.zeebe.db.ZeebeDb
import io.camunda.zeebe.engine.Engine
import io.camunda.zeebe.engine.api.CommandResponseWriter
import io.camunda.zeebe.engine.api.InterPartitionCommandSender
import io.camunda.zeebe.engine.processing.EngineProcessors
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributionCommandSender
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors
import io.camunda.zeebe.engine.state.ZbColumnFamilies
import io.camunda.zeebe.engine.state.appliers.EventAppliers
import io.camunda.zeebe.protocol.ZbColumnFamilies
import io.camunda.zeebe.scheduler.ActorSchedulingService
import io.camunda.zeebe.streamprocessor.StreamProcessor
import io.camunda.zeebe.streamprocessor.StreamProcessorMode
import io.camunda.zeebe.stream.api.CommandResponseWriter
import io.camunda.zeebe.stream.api.InterPartitionCommandSender
import io.camunda.zeebe.stream.impl.StreamProcessor
import io.camunda.zeebe.stream.impl.StreamProcessorMode
import io.camunda.zeebe.util.FeatureFlags
import org.camunda.community.eze.db.EzeZeebeDbFactory
import java.nio.file.Files
Expand All @@ -38,7 +37,6 @@ object EzeStreamProcessorFactory {
val streamProcessor = StreamProcessor.builder()
.logStream(logStream.getZeebeLogStream())
.zeebeDb(zeebeDb)
.eventApplierFactory { EventAppliers(it) }
.commandResponseWriter(responseWriter)
.partitionCommandSender(createPartitionCommandSender(logStream))
.nodeId(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ package org.camunda.community.eze.engine

import io.camunda.zeebe.logstreams.storage.LogStorage
import io.camunda.zeebe.logstreams.storage.LogStorageReader
import io.camunda.zeebe.util.buffer.BufferUtil
import io.camunda.zeebe.util.buffer.BufferWriter
import org.agrona.DirectBuffer
import org.agrona.concurrent.UnsafeBuffer
import java.nio.ByteBuffer
import java.util.*
import java.util.concurrent.ConcurrentSkipListMap
import java.util.function.Function
Expand All @@ -37,11 +37,13 @@ class InMemoryLogStorage : LogStorage {
override fun append(
lowestPosition: Long,
highestPosition: Long,
blockBuffer: ByteBuffer,
bufferWriter: BufferWriter,
listener: LogStorage.AppendListener
) {
try {
val entry = Entry(blockBuffer)
val copy: DirectBuffer = BufferUtil.createCopy(bufferWriter)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saig0 why do you think this was necessary now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the original structure of the in-memory log storage. Since the API changed, I needed to extract the content of the BufferWriter to fill the entry.

But maybe, there is another way. I'm open to your ideas. 😄


val entry = Entry(copy)
entries.add(entry)
val index = entries.size
positionIndexMapping[lowestPosition] = index
Expand All @@ -54,7 +56,7 @@ class InMemoryLogStorage : LogStorage {
}
}

private data class Entry(val data: ByteBuffer)
private data class Entry(val data: DirectBuffer)

private inner class ListLogStorageReader : LogStorageReader {

Expand Down Expand Up @@ -83,7 +85,7 @@ class InMemoryLogStorage : LogStorage {
}
val index = currentIndex
currentIndex++
return UnsafeBuffer(entries[index].data)
return entries[index].data
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,65 +7,68 @@
*/
package org.camunda.community.eze.engine

import io.camunda.zeebe.engine.api.InterPartitionCommandSender
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter
import io.camunda.zeebe.logstreams.log.LogAppendEntry
import io.camunda.zeebe.logstreams.log.LogStreamWriter
import io.camunda.zeebe.protocol.impl.record.RecordMetadata
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue
import io.camunda.zeebe.protocol.record.RecordType
import io.camunda.zeebe.protocol.record.ValueType
import io.camunda.zeebe.protocol.record.intent.Intent
import io.camunda.zeebe.util.buffer.BufferWriter
import java.util.function.Consumer
import io.camunda.zeebe.stream.api.InterPartitionCommandSender
import java.util.function.Supplier

class SinglePartitionCommandSender(
private val writerLookUp: (Int) -> LogStreamRecordWriter
private val writerLookUp: (Int) -> LogStreamWriter
) : InterPartitionCommandSender {

private fun withRecordWriter(
receiverPartitionId: Int,
writer: Consumer<LogStreamRecordWriter>
writer: Supplier<LogAppendEntry>
) {
val recordWriter = writerLookUp(receiverPartitionId)
writer.accept(recordWriter)
recordWriter.tryWrite()
val entry = writer.get()
recordWriter.tryWrite(entry)
}

override fun sendCommand(
receiverPartitionId: Int,
valueType: ValueType,
intent: Intent,
command: BufferWriter
valueType: ValueType?,
intent: Intent?,
command: UnifiedRecordValue?
) {
withRecordWriter(receiverPartitionId) { writer ->
withRecordWriter(receiverPartitionId) {
val recordMetadata =
RecordMetadata()
.recordType(RecordType.COMMAND)
.valueType(valueType)
.intent(intent)

writer
.metadataWriter(recordMetadata)
.valueWriter(command)
LogAppendEntry.of(
recordMetadata,
command
)
}
}

override fun sendCommand(
receiverPartitionId: Int,
valueType: ValueType,
intent: Intent,
valueType: ValueType?,
intent: Intent?,
recordKey: Long,
command: BufferWriter
command: UnifiedRecordValue?
) {
withRecordWriter(receiverPartitionId) { writer ->
withRecordWriter(receiverPartitionId) {
val recordMetadata =
RecordMetadata()
.recordType(RecordType.COMMAND)
.valueType(valueType)
.intent(intent)

writer
.key(recordKey)
.metadataWriter(recordMetadata)
.valueWriter(command)
LogAppendEntry.of(
recordKey,
recordMetadata,
command
)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
*/
package org.camunda.community.eze.grpc

import io.camunda.zeebe.engine.api.CommandResponseWriter
import io.camunda.zeebe.stream.api.CommandResponseWriter


class EzeGateway(
private val responseWriter: GrpcResponseWriter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
*/
package org.camunda.community.eze.grpc

import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter
import io.camunda.zeebe.logstreams.log.LogStreamWriter
import io.grpc.ServerBuilder

object EzeGatewayFactory {

fun createGateway(
port: Int,
streamWriter: LogStreamRecordWriter
streamWriter: LogStreamWriter
): EzeGateway {

val gateway = GrpcToLogStreamGateway(writer = streamWriter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package org.camunda.community.eze.grpc
import com.google.protobuf.GeneratedMessageV3
import com.google.rpc.Code
import com.google.rpc.Status
import io.camunda.zeebe.engine.api.CommandResponseWriter
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord
Expand All @@ -24,6 +23,7 @@ import io.camunda.zeebe.protocol.record.RejectionType
import io.camunda.zeebe.protocol.record.ValueType
import io.camunda.zeebe.protocol.record.intent.Intent
import io.camunda.zeebe.protocol.record.intent.JobIntent
import io.camunda.zeebe.stream.api.CommandResponseWriter
import io.camunda.zeebe.util.buffer.BufferUtil
import io.camunda.zeebe.util.buffer.BufferWriter
import org.agrona.DirectBuffer
Expand Down
Loading