#!/usr/bin/env python # Copyright (c) PLUMgrid, Inc. # Licensed under the Apache License, Version 2.0 (the "License") from bcc import BPF import ctypes as ct import random import time import subprocess from bcc.utils import get_online_cpus from unittest import main, TestCase class TestArray(TestCase): def test_simple(self): b = BPF(text="""BPF_ARRAY(table1, u64, 128);""") t1 = b["table1"] t1[ct.c_int(0)] = ct.c_ulonglong(100) t1[ct.c_int(127)] = ct.c_ulonglong(1000) for i, v in t1.items(): if i.value == 0: self.assertEqual(v.value, 100) if i.value == 127: self.assertEqual(v.value, 1000) self.assertEqual(len(t1), 128) def test_native_type(self): b = BPF(text="""BPF_ARRAY(table1, u64, 128);""") t1 = b["table1"] t1[0] = ct.c_ulonglong(100) t1[-2] = ct.c_ulonglong(37) t1[127] = ct.c_ulonglong(1000) for i, v in t1.items(): if i.value == 0: self.assertEqual(v.value, 100) if i.value == 127: self.assertEqual(v.value, 1000) self.assertEqual(len(t1), 128) self.assertEqual(t1[-2].value, 37) self.assertEqual(t1[-1].value, t1[127].value) def test_perf_buffer(self): self.counter = 0 class Data(ct.Structure): _fields_ = [("ts", ct.c_ulonglong)] def cb(cpu, data, size): self.assertGreater(size, ct.sizeof(Data)) event = ct.cast(data, ct.POINTER(Data)).contents self.counter += 1 def lost_cb(lost): self.assertGreater(lost, 0) text = """ BPF_PERF_OUTPUT(events); int do_sys_nanosleep(void *ctx) { struct { u64 ts; } data = {bpf_ktime_get_ns()}; events.perf_submit(ctx, &data, sizeof(data)); return 0; } """ b = BPF(text=text) b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"), fn_name="do_sys_nanosleep") b.attach_kprobe(event=b.get_syscall_fnname("clock_nanosleep"), fn_name="do_sys_nanosleep") b["events"].open_perf_buffer(cb, lost_cb=lost_cb) subprocess.call(['sleep', '0.1']) b.perf_buffer_poll() self.assertGreater(self.counter, 0) b.cleanup() def test_perf_buffer_for_each_cpu(self): self.events = [] class Data(ct.Structure): _fields_ = [("cpu", ct.c_ulonglong)] def cb(cpu, data, size): self.assertGreater(size, ct.sizeof(Data)) event = ct.cast(data, ct.POINTER(Data)).contents self.events.append(event) def lost_cb(lost): self.assertGreater(lost, 0) text = """ BPF_PERF_OUTPUT(events); int do_sys_nanosleep(void *ctx) { struct { u64 cpu; } data = {bpf_get_smp_processor_id()}; events.perf_submit(ctx, &data, sizeof(data)); return 0; } """ b = BPF(text=text) b.attach_kprobe(event=b.get_syscall_fnname("nanosleep"), fn_name="do_sys_nanosleep") b.attach_kprobe(event=b.get_syscall_fnname("clock_nanosleep"), fn_name="do_sys_nanosleep") b["events"].open_perf_buffer(cb, lost_cb=lost_cb) online_cpus = get_online_cpus() for cpu in online_cpus: subprocess.call(['taskset', '-c', str(cpu), 'sleep', '0.1']) b.perf_buffer_poll() b.cleanup() self.assertGreaterEqual(len(self.events), len(online_cpus), 'Received only {}/{} events'.format(len(self.events), len(online_cpus))) if __name__ == "__main__": main()