Skip to content

Commit

Permalink
fix: remove unused var
Browse files Browse the repository at this point in the history
  • Loading branch information
puuuuug authored and 正业 committed Jan 17, 2025
1 parent b44de90 commit 9123951
Showing 1 changed file with 1 addition and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public class KafkaTableSource extends AbstractTableSource {
private String topic;
private long startTimeMs;
private Properties props;
private String connectorFormat;
private TableSchema schema;
private Duration pollTimeout;

private transient KafkaConsumer<String, String> consumer;
Expand Down Expand Up @@ -93,7 +91,6 @@ public void init(Configuration conf, TableSchema tableSchema) {
if (pullSize <= 0) {
throw new GeaFlowDSLException("Config {} is illegal:{}", KafkaConfigKeys.GEAFLOW_DSL_KAFKA_PULL_BATCH_SIZE, pullSize);
}
this.schema = tableSchema;
this.props = new Properties();
props.setProperty(KafkaConstants.KAFKA_BOOTSTRAP_SERVERS, servers);
props.setProperty(KafkaConstants.KAFKA_KEY_DESERIALIZER,
Expand All @@ -103,7 +100,7 @@ public void init(Configuration conf, TableSchema tableSchema) {
props.setProperty(KafkaConstants.KAFKA_MAX_POLL_RECORDS,
String.valueOf(pullSize));
props.setProperty(KafkaConstants.KAFKA_GROUP_ID, groupId);
LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}, connector format is : {}",
LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}",
servers, topic, conf, tableSchema);
}

Expand Down

0 comments on commit 9123951

Please sign in to comment.