KNet: JVM callbacks
One of the features of JCOBridge, used in KNet, is the callback management from JVM. Many applications use the callback mechanism to be informed about events which happens during execution. Apache Kakfa exposes many API which have callbacks in the parameters. The Java code of a callback can be written with lambda expressions, but KNet cannot, it needs an object.
KNet Callback internals
KNet is based on JCOBridge. JCOBridge as per its name is a bridge between the CLR (CoreCLR) and the JVM. Events, generally are expressed as interfaces in Java, and a lambda expression is translated into an object at compile time. Otherwise the developer can implement a Java class which implements the interface: with JCOBridge the developer needs to follow a seamless approach. In KNet some callbacks are ready made. In this tutorial the Callback interface (org.apache.kafka.clients.producer.Callback) will be taken as an example. The concrete class implementing the interface is the following one:
public final class CallbackImpl extends JCListener implements Callback {
public CallbackImpl(String key) throws JCNativeException {
super(key);
}
@Override
public void onCompletion(org.apache.kafka.clients.producer.RecordMetadata metadata, Exception exception) {
raiseEvent("onCompletion", metadata, exception);
}
}
The structure follows the guidelines of JCOBridge:
- It must
extends
the base classJCListener
(orimplements
the interfaceIJCListener
): this is a constraint of JCOBridge;JCListener
has many ready made methods; if the callback is not based on an interface the developer canimplements
theIJCListener
; - The concrete class must have at least a constructor accepting a String;
- Within the implementation of the interface method (in this case the method
onCompletion
of theCallback
interface) the methodraiseEvent
informs the CLR that a method was raised using the specific key (onCompletion in this case) along with all associated objects:- If the interface has many methods each one must have its own
raiseEvent
call; - The key used from raiseEvent is not mandatory to be equal to the name of the calling method, it is only a convention for the mapping: this will be more clear looking at the C# code.
- If the interface has many methods each one must have its own
Now there is a concrete class within the JVM space. Going on to the CLR side a possible concrete class in C# is as the following one:
public class Callback : CLRListener
{
public sealed override string JniClass => "org.mases.kafkabridge.clients.producer.CallbackImpl";
readonly Action<RecordMetadata, JVMBridgeException> executionFunction = null;
public virtual Action<RecordMetadata, JVMBridgeException> Execute { get { return executionFunction; } }
public Callback(Action<RecordMetadata, JVMBridgeException> func = null)
{
if (func != null) executionFunction = func;
else executionFunction = OnCompletion;
AddEventHandler("onCompletion", new EventHandler<CLRListenerEventArgs<JVMBridgeEventData<RecordMetadata>>>(EventHandler));
}
void EventHandler(object sender, CLRListenerEventArgs<JVMBridgeEventData<RecordMetadata>> data)
{
var exception = data.EventData.ExtraData.Get(0) as IJavaObject;
Execute(data.EventData.TypedEventData, JVMBridgeException.New(exception));
}
public virtual void OnCompletion(RecordMetadata metadata, JVMBridgeException exception) { }
}
The structure follows the guidelines of JCOBridge:
- It must
extends
the base classCLRListener
: this is a constraint of JCOBridge;CLRListener
contains all the functionality to handle events from the JVM; - The
JniClass
property informs the base class about the concrete class in JVM associated to this event handler; - Within the constructor the method
AddEventHandler
registers a .NETEventHandler
associated to the method in JVM; look at the key string: it is the same used from the JVM;- The costructor of the code above accept in input an
Action
which permits to write lambda expression in C#; - The code above associate a private handler with specific data type:
CLRListenerEventArgs
is mandatory and it is used fromCLRListener
;JVMBridgeEventData
informs the subsystem that the first parameter inherits from aJVMBridgeBase
class and must be treated accordingly;RecordMetadata
represents the CLR version of the correspondingRecordMetadata
within the JVM;
- The costructor of the code above accept in input an
- On callback invocation (
onCompletion
in this case) the CLR will invokeEventHandler
:- The first parameter is directly reported using the
TypedEventData
property; - The second parameter shall be managed differently in this case because it is an
Exception
:- JCOBridge has its own mechanism to translate the exception from the JVM;
- Parameters raised from JVM, beyond the first, are available within
ExtraData
property; - The code extracts the first object (the second of the event) and converts it into a generic
IJavaObject
; - Invoking
JVMBridgeException.New(exception)
the subsystem reads the data from the real JVM exception and try to instantiate a valid exception, otherwise returns a genericJVMBridgeException
;JVMBridgeException.New(exception)
can return null if the extracted data is not anIJavaObject
or it is null within the JVM;
- The first parameter is directly reported using the
- Other pieces of the class are useful in other condition:
- Creating a new class extending
Callback
class, the methodOnCompletion
can be overridden; - Otherwise to the property
Execute
can be associated to an handler;
- Creating a new class extending
KNet Callback lifecycle
The lifecycle of the callback managed from JCOBridge is slightly different from the standard one.
A CLRListener
to be able to live without to be recovered from the Garbage Collector shall be registered. CLRListener
do this automatically within the initialization (this behavior can be avoided with the property AutoInit
).
So at the end of its use it must be disposed to avoid a resource leak. In the Producer with Callback example there is a using clause and the class is instantiated only one time.
A correct approach is like the following:
using (var callback = new Callback((o1, o2) =>
{
if (o2 != null) Console.WriteLine(o2.ToString());
else Console.WriteLine($"Produced on topic {o1.Topic} at offset {o1.Offset}");
}))
{
while (!resetEvent.WaitOne(0))
{
var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
var result = producer.Send(record, callback);
Console.WriteLine($"Producing: {record} with result: {result.Get()}");
producer.Flush();
i++;
}
}
while an approach like the following one:
var result = producer.Send(record, new Callback((o1, o2) =>
{
if (o2 != null) Console.WriteLine(o2.ToString());
else Console.WriteLine($"Produced on topic {o1.Topic} at offset {o1.Offset}");
}));
there are two main drawbacks:
- it creates a resource leak because the object cannot be programmatically disposed;
- on each cycle, the engine shall allocate the infrastructure to handle events from the JVM.