1:Fetcher类
此类管理broker上的的提取过程。
Fetcher类的主要功能是发送Fetcher请求,获取指定消息集合,处理FetchResponse,更新消息位置。
1.1:属性
private final ConsumerNetworkClient client; //client负责网络通信
private final Time time;
private final int minBytes; //在服务端收到FetchRequest之后并不是立即响应,而是当可返回的消息数据累计到至少minBytes个字节时才响应。这样每个FetchResponse中就包含了多个消息,提供效率。
private final int maxBytes;
private final int maxWaitMs; //等待FetchResponse的最长时间
private final int fetchSize; //每次fetch操作的最大字节数
private final long retryBackoffMs; //重试等待时间
private final long requestTimeoutMs; //fetch请求超时时间
private final int maxPollRecords; //每次获取record的最大数量
private final boolean checkCrcs;
private final Metadata metadata; //记录kakfa集群的元数据
private final FetchManagerMetrics sensors;
private final SubscriptionState subscriptions; //记录topic的订阅情况,每个TopicPartition的消费情况,偏移量记录等
//已经完成获取请求的线程安全的队列
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
//数据key的反序列化
private final ExtendedDeserializer<K> keyDeserializer;
private final ExtendedDeserializer<V> valueDeserializer;
private final IsolationLevel isolationLevel;
private final Map<Integer, FetchSessionHandler> sessionHandlers;
private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
//PartitionRecrods类型,保存了CompleteFetch解析后的结果集合,它有三个主要字段
/*
private long fetchOffset; //records中第一个消息offset
private TopicPartition partition; //对应的TopicPartition
private List<ConsumerRecord<K, V>> records; //消息集合
*/
//记录那个分区,消费的偏移量,数据条数等元数据信息
private PartitionRecords nextInLineRecords = null;
1.2:方法
- OffsetData :获取偏移量
- sendFetches:向分配分区的所在节点发送fetch请求,拉取数据
- fetchedRecords: 处理已经获取的每个分区的记录,清空记录缓冲区并更新消耗位置。
/**
* Represents data about an offset returned by a broker.
* 表示有关代理返回的偏移量的数据。
*/
private static class OffsetData {
final long offset;
final Long timestamp; // null if the broker does not support returning timestamps
OffsetData(long offset, Long timestamp) {
this.offset = offset;
this.timestamp = timestamp;
}
}
向分配分区的所在节点发送fetch请求,拉取数据:sendFetches
public int sendFetches() {
// 定义一个节点和请求数据的map集合,存储节点和节点的连接会话
// 所有节点创建获取请求,我们为这些节点分配了分区,这些分区没有正在运行的现有请求。
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : fetchRequestMap.entrySet()) {
//获取节点
final Node fetchTarget = entry.getKey();
// 获取和节点的连接会话
final FetchSessionHandler.FetchRequestData data = entry.getValue();
// 根据连接会话创建一个获取数据的请求
final FetchRequest.Builder request = FetchRequest.Builder
.forConsumer(this.maxWaitMs, this.minBytes, data.toSend())
.isolationLevel(isolationLevel)
.setMaxBytes(this.maxBytes) //一次拉取的最大字节数50M
.metadata(data.metadata())
.toForget(data.toForget());
if (log.isDebugEnabled()) {
log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
}
//把发往每个Node的FetchRequest都缓存到unsent队列上
client.send(fetchTarget, request) //添加Listener监听,这也是处理FetchResponse的入口
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
//请求成功了,处理返回的响应
FetchResponse response = (FetchResponse) resp.responseBody();
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler == null) {
log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.",
fetchTarget.id());
return;
}
if (!handler.handleResponse(response)) {
return;
}
// 获取响应数据的主题分区对象,放入set集合中
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
// 这是静态内部类
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
//遍历响应中的数据
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
//获取分区
TopicPartition partition = entry.getKey();
// 获取偏移量
long fetchOffset = data.sessionPartitions().get(partition).fetchOffset;
// 获取分区数据,里面包括消费的数据
FetchResponse.PartitionData fetchData = entry.getValue();
log.debug("Fetch {} at offset {} for partition {} returned fetch data {}",
isolationLevel, fetchOffset, partition, fetchData);
//创建completedFetch,把拉取完成数据缓存到completedFetch队列中
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
resp.requestHeader().apiVersion()));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
}
@Override
public void onFailure(RuntimeException e) {
FetchSessionHandler handler = sessionHandlers.get(fetchTarget.id());
if (handler != null) {
handler.handleError(e); /*异常处理*/
}
}
});
}
return fetchRequestMap.size();
}
fetchedRecords: 处理已经获取的每个分区的记录,清空记录缓冲区并更新消耗位置。
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
// 定义一个消费的主题分区和数据map集合
Map<TopicPartition, List<ConsumerRecord<K, V>>> fetched = new HashMap<>();
int recordsRemaining = maxPollRecords; //记录剩余可拉取的记录条数,每拉取一次被更新一次
try {
while (recordsRemaining > 0) {
// 只要需要拉取记录数大于0,就继续拉取
// 1:判断分区记录是否为空或者该分区是否被获取过
if (nextInLineRecords == null || nextInLineRecords.isFetched) {
// completedFetches存储已经拉取的数据。获取但不删除队列的头元素,如果不为空则是已经拉取的数据
CompletedFetch completedFetch = completedFetches.peek();
//为空说明没有被拉取过的数据,break
if (completedFetch == null) break;
//如果不是空解析一个completedFetches得到一个PartitionRecords对象。
// 记录那个分区,获取了多少记录,偏移量是多少
nextInLineRecords = parseCompletedFetch(completedFetch);
//poll是从队列取出拉取的批数据并且删除
completedFetches.poll();
} else {
//:2获取消费数据集
List<ConsumerRecord<K, V>> records = fetchRecords(nextInLineRecords, recordsRemaining);
// 获取主题分区
TopicPartition partition = nextInLineRecords.partition;if (!records.isEmpty()) {
//该分区的数据
List<ConsumerRecord<K, V>> currentRecords = fetched.get(partition);
if (currentRecords == null) {
fetched.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
// 可消费的剩下数据条数减去刚才消费到的数据集的大小
recordsRemaining -= records.size();
}
}
}
} catch (KafkaException e) {
if (fetched.isEmpty())
throw e;
}
return fetched;
}
文章评论