最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

Fabric8 Kubernetes 日志工具实践

网站源码admin2浏览0评论

Fabric8 Kubernetes 日志工具实践

最近在使用 Fabric8 Kubernetes Client 的过程中发现了新大陆一样,感觉利用这个库可以进行很多有趣的功能尝试,其中一个便是日志的本地化。

原因无他,rancher 页面性能实在太差了,经常性的暂停工作,碰到故障排查的时候,着实让人恼火。当我看到 Fabric8 Kubernetes Client 的日志相关 API 的时候我就立刻冒出来写一个日志小工具的想法。

API简介

首先我们简单介绍一下 API,以方便快速进入场景。后续等我自觉学得差不多了,再来列个专题给大家分享 Fabric8 Kubernetes Client 的全部 API 实践经验。

在本次分享当中,主要用到了两种日志 API:getLog()watchLog()

以下是 Fabric8 Kubernetes Client 日志功能的结构化总结:

功能点与 API 对照表

首选日志流

对于日志需求来讲,流式调用自然是最好不过了,可以及时获取最新的日志信息,还不用后期干预。这里我选择了watchLog()(无参调用),watchLog() 返回一个 LogWatch 实例,该实例包含 getOutput() 方法,可获取日志流。适用于 手动解析日志流,比 watchLog(System.out) 更灵活。

代码语言:javascript代码运行次数:0运行复制
import com.auto.fault.framework.funtester.frame.SourceCode
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.LogWatch

@Log4j2
class TesClient extends SourceCode {

    public static void main(String[] args) {
        try (KubernetesClient client = new DefaultKubernetesClient();
             def pods = client.pods().inNamespace("test").list()
             def marketPod = pods.getItems().find {
                 it.getMetadata().getName().contains("FunTester-pod")
             }.getMetadata().getName();
             LogWatch logWatch = client.pods()
                     .inNamespace("FunTester-default")
                     .withName("FunTester-mypod")
                     .watchLog()) {
            logWatch.getOutput().eachLine {
                System.out.println("Pod Log: " + it);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        waitForKey("Press any key to exit")
    }
}


但是在实际的使用当中,经过几分钟,最长不超过十几分钟之后,流里面居然不再输出日志信创了,感觉很奇怪。经过查询资料和多方面验证,依旧没有解决问题,rancher 自带的WebSocket 推送日志也遇到这个问题。最终还是把问题甩给了服务端。

目前这种方式只适用于调试过程中查看日志,使用的时候本地启动一个脚本用来实时展示日志的情况。

下面是我的封装方法,仅供参考:

代码语言:javascript代码运行次数:0运行复制
    /
     * 处理日志流, 通过 WatchLog 方式, 适用于实时日志, 适用于日志量较小的场景,可能会被中断
     * @param client
     * @param namespace
     * @param podName
     * @param consumer
     * @return
     */
    static def handlePodLogFlow(String namespace, String podName, Consumer<String> consumer) {
        try (def logWatch = K8sService.client.pods()
                .inNamespace(namespace)
                .withName(podName)
                .tailingLines(200)
                .watchLog()) {
            logWatch.getOutput().eachLine {
                consumer(it)
            }
        } catch (e) {
            log.error("handle log error: {}", e)
        }

    }
    

循环拉取

剩下的另外一个方式就是定时任务实现循环拉取日志了,用到了 sinceSeconds() 这个 API,逻辑也比较简单就是每隔一段时间拉取最近一段时间的日志。

但是在使用当中遇到一个问题,由于执行耗时以及网络原因,如果我每 10s 拉取最近 10s 的日志总会丢日志,如果拉取最近 11s 的日志,又会有一些重复的日志。

为了解决这个问题,我特意咨询了AI,给了下面三种思路。

  1. 基于日志 ID 去重:如果日志自带唯一标识(如 requestId、traceId),可以使用 集合(Set) 记录已处理的日志 ID,在处理新日志时先检查 ID 是否已存在,若已存在则跳过。这种方式精准高效,但 Set 可能会无限增长,需要定期清理,适用于日志 ID 唯一且可以长期存储 ID 记录的场景。
  2. 基于时间窗口去重:如果日志没有唯一 ID,但有时间戳,可以记录 上一次处理的最大时间戳,只处理时间戳 大于 这个值的日志,处理完成后更新最大时间戳。这种方式内存占用小,适用于 时间戳严格递增 的情况,但如果日志乱序,可能会丢失部分数据。
  3. 基于 LRU(最近最少使用)缓存去重:如果日志 ID 变化范围大,但不能无限存储已处理 ID,可以使用 固定大小的缓存(如 LRU 哈希表)存储最近 N 条已处理的日志 ID,超出部分自动删除。这种方式适用于 高并发、大量日志流入 的情况,能有效控制内存占用,但需要合理设置缓存大小,以平衡去重效果与资源消耗。

最终我选了基于时间窗户,日志返回的时间是毫秒时间戳,这样根据时间戳进行筛选,可以避免重复和丢日志的情况。

下面是我的封装代码:

代码语言:javascript代码运行次数:0运行复制
    /**
     * 处理日志流, 通过 getlogs 方式, 定时获取任务,避免中断
     * @param namespace
     * @param podName
     * @param consumer
     * @return
     */
    static def handleLogs(String namespace, String podName, Consumer<String> consumer) {
        long lastTime
        ThreadPoolUtil.scheduleRate({
            time {
                try (def reader = K8sService.client.pods()
                        .inNamespace(namespace)
                        .withName(podName)
                        .sinceSeconds(11)
                        .getLogReader()) {
                    def lines = reader.readLines()
                    boolean start = false
                    lines.each {
                        if (!start && getTimestamp(it) > lastTime) {
                            start = true
                        }
                        if (start) {
                            consumer(it)
                        }
                    }
                    lastTime = getTimestamp(lines.get(lines.size() - 1));
                } catch (Exception e) {
                    log.error("handle log error: {}", e)
                }
            }
        }, 10)

    }
    

使用方法如下:

代码语言:javascript代码运行次数:0运行复制
import com.auto.fault.framework.funtester.frame.SourceCode
import com.auto.fault.framework.utils.k8s.K8sLog
import groovy.util.logging.Log4j2
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import io.fabric8.kubernetes.client.KubernetesClient

import java.util.concurrent.ScheduledFuture

@Log4j2
class TesClient extends SourceCode {

    public static void main(String[] args) {
        KubernetesClient client = new DefaultKubernetesClient()
        ScheduledFuture<?> logs = K8sLog.handleLogs("funtester", "funtester-0", {
            println it
        })
        waitForKey("Press any key to exit")
        logs.cancel(true)
        client.close()
    }
}

原来我也想通过一个去重的队列实现,发现 Java 本身并没有提供这个能力,如果单独写一个比较麻烦,得不偿失,最终也放弃了。

基于 LinkedHashMap

这里分享一下 LinkedHashMap 方案,因为 removeEldestEntry() 方法让我学到了新知识,本来我打算用 Caffeine 实现的,没想到 Java 还提供了替代方案。

基于插入顺序

代码语言:javascript代码运行次数:0运行复制
import java.util.*;

publicclass LogProcessor {
    privatestaticfinalint MAX_ENTRIES = 10000; // 只存最近的日志 ID
    privatestaticfinal LinkedHashMap<String, Long> processedLogs =
            new LinkedHashMap<>(MAX_ENTRIES, 0.75f) {
                @Override
                protected boolean removeEldestEntry(Map.Entry<String, Long> eldest) {
                    return size() > MAX_ENTRIES;
                }
            };

    public void processLogs(List<Log> logs) {
        long now = System.currentTimeMillis();

        for (Log log : logs) {
            if (processedLogs.containsKey(log.getId())) {
                continue;
            }
            processedLogs.put(log.getId(), now);
            processLog(log);
        }
    }

    private void processLog(Log log) {
        System.out.println("FunTester Processing log: " + log);
    }
}

removeEldestEntryLinkedHashMap 提供的一个受保护(protected)方法,用于控制缓存的大小。当 LinkedHashMap 作为 LRU 缓存(最近最少使用缓存) 使用时,可以重写该方法,在元素数量超过限制时自动移除最早插入的元素。

  • eldest 参数表示当前 LinkedHashMap 中 最老(最早插入)的键值对。
  • 返回值 true 时,eldest 会被移除;返回 false,则不会移除。

基于最近访问顺序

要根据访问时间删除 LinkedHashMap 中的旧数据,可以利用 LinkedHashMap 的 accessOrder=true 特性,让最近访问的数据排在后面,并在 removeEldestEntry 方法中检查数据的时间戳是否超时,超时则删除。适用于 基于时间的自动清理缓存,如日志、会话管理等。

实现步骤

  1. 启用 LRU 访问顺序:创建 LinkedHashMap 时,设置 accessOrder=true,使最近访问的数据自动移动到队尾。
  2. 存储时间戳:在 LinkedHashMap 中存储键值对,值包含数据的时间戳。
  3. 按访问时间删除:在 removeEldestEntry 方法中,检查最老的元素是否超过设定的超时时间(如 10 分钟),如果超时,则返回 true 进行删除。

实现方法:

代码语言:javascript代码运行次数:0运行复制
import java.util.*;

publicclass AccessTimeCache<K, V> extends LinkedHashMap<K, V> {
    privatefinallong EXPIRATION_TIME_MS; // 过期时间,单位毫秒

    public AccessTimeCache(int capacity, long expirationTimeMs) {
        super(capacity, 0.75f, true); // accessOrder=true,启用 LRU
        this.EXPIRATION_TIME_MS = expirationTimeMs;
    }

    @Override
    protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
        long currentTime = System.currentTimeMillis();
        // 假设 V 是一个包含时间戳的对象,这里需替换成实际数据结构
        if (eldest.getValue() instanceof CacheItem) {
            CacheItem item = (CacheItem) eldest.getValue();
            return (currentTime - item.timestamp) > EXPIRATION_TIME_MS;
        }
        returnfalse;
    }

    // 模拟存储数据时的结构
    staticclass CacheItem {
        String data;
        long timestamp;

        public CacheItem(String data) {
            this.data = data;
            this.timestamp = System.currentTimeMillis();
        }
    }
}

当然我们还可以根据时间+条目总数来控制,这里就不再赘述了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。原始发表:2025-04-21,如有侵权请联系 cloudcommunity@tencent 删除kubernetes缓存工具日志实践
发布评论

评论列表(0)

  1. 暂无评论