KNet: Connect SDK
This is only a quick start guide for KNet Connect SDK, other information related to Apache Kafka™ Connect can be found at the following link https://kafka.apache.org/documentation/#connect
Important
Starting from KNet 3.0.3, the infrastructure of the KNet Connect SDK supports both the .NET hosted runtime and the JVM hosted runtime: supporting the JVM hosted runtime guarantee that the connectors written in .NET languages can be deployed using the official documentation
Code structure
The code for a connector based on KNet Connect SDK follows the same rules for both .NET and the JVM hosted runtimes.
Tip
Except for some specific method signature or additional properties, the rules used in a JVM developed connectors/transformer/predicate applies to KNet Connect SDK connectors/transformer/predicate.
Source connector
A source connector shall be defined extending the following class:
public abstract class KNetSourceConnector<TSourceConnector, TTask> : KNetConnector<TSourceConnector>
where TSourceConnector : KNetSourceConnector<TSourceConnector, TTask>
where TTask : KNetSourceTask<TTask>
where the TTask type shall extend the following class:
public abstract class KNetSourceTask<TTask> : KNetTask<TTask>
where TTask : KNetSourceTask<TTask>
The mandatory method to be implemented is:
public abstract System.Collections.Generic.IList<SourceRecord> Poll();
which returns a set of SourceRecord, each SourceRecord can be created directly or by using the available CreateRecord helper methods.
Tip
Starting from KNet 3.0.3, beside the standard invocation where the SourceRecords will be returned once at the end of the Poll invocation and then pushed to the JVM,
by using the CreateAndPushRecord helper methods available each SourceRecord is created and directly pushed to the JVM:
this new way can be used to take advantage of the idle time if the KNetSourceTask<TTask> is waiting to receive data, e.g. socket, disk access, etc.
Tip
KNet 3.1.2 introduces, beside the CreateAndPushRecord helper methods, some new CreateAndPushRecordAsync helper methods which can push records in the JVM outside the Poll invocation:
these new helpers decouple Poll invocation from SourceRecord generation and can be useful if the .NET code is taking adavantage of async/await pattern or similar; if UseOnlyAsync is set to true
the Poll method is never invoked and everything happens in async from the .NET point-of-view (JVM side continues to invoke the poll method, but the returned list was filled in the meantime).
Sink connector
A sink connector shall be defined extending the following class:
public abstract class KNetSinkConnector<TSinkConnector, TTask> : KNetConnector<TSinkConnector>
where TSinkConnector : KNetSinkConnector<TSinkConnector, TTask>
where TTask : KNetSinkTask<TTask>
where the TTask type shall extend the following class:
public abstract class KNetSinkTask<TTask> : KNetTask<TTask>
where TTask : KNetSinkTask<TTask>
The mandatory method to be implemented is:
public abstract void Put(IEnumerable<SinkRecord> collection);
which gives a set of SinkRecord to be used.
Predicates
A predicate shall be defined extending the following class:
public abstract class KNetPredicate : KNetCommon, IKNetPredicate
The methods can be implemented are:
public virtual bool Test(ConnectRecord record);
or in alternative:
- Source predicate
public virtual bool Test(SourceRecord record); - Sink predicate
public virtual bool Test(SinkRecord record);
Transformation
A transformation shall be defined extending the following class:
public abstract class KNetTransformation : KNetCommon, IKNetTransformation
The methods can be implemented are:
public virtual ConnectRecord Apply(ConnectRecord record);
or in alternative:
- Source transformation
public virtual SourceRecord Apply(SourceRecord record); - Sink transformation
public virtual SinkRecord Apply(SinkRecord record);
Programmatically properties overrides
Starting from KNet 3.1.2, some mandatory configuration properties can be overridden at code level avoiding their declaration in configuration. To obtain the result it is possible to extend
org.mases.knet.developed.connect.sink.KNetSinkConnectororg.mases.knet.developed.connect.source.KNetSourceConnectororg.mases.knet.developed.connect.transforms.KNetTransformationorg.mases.knet.developed.connect.transforms.predicates.KNetPredicate
and overrides some methods declared in org.mases.knet.developed.connect.KNetConnectInitializer interface.
Here below an example based on a generic Source connector named MySourceConnector.
In .NET, the connector is defined from:
namespace MyConnectorNamespace
{
public class MySourceConnector : KNetSourceConnector<MySourceConnector, MySourceTask>
{
// other methods
}
public class MySourceTask : KNetSourceTask<MySourceTask>
{
// other methods
}
}
Before KNet 3.1.2, the connector can be identified as described in Configuration properties with:
knet.dotnet.classname=MyConnectorNamespace.MySourceConnector, MySourceConnectorAssembly
From KNet 3.1.2, the JVM side exposes two new methods getAssemblyLocation and getClassName.
Overriding the methods in a child class of
org.mases.knet.developed.connect.sink.KNetSinkConnectororg.mases.knet.developed.connect.source.KNetSourceConnectororg.mases.knet.developed.connect.transforms.KNetTransformationorg.mases.knet.developed.connect.transforms.predicates.KNetPredicate
the properties are defined at code level.
Here an example of a Java class made explicitly to manage the .NET connector (the example is available for a Source connector, but extends to a Sink connector, Predicate and Transformation):
package myconnectorpackage
public class MySourceConnector extends KNetSourceConnector {
private static final Logger log = LoggerFactory.getLogger(MySourceConnector.class);
private static final String VERSION = "1.0.0";
private static final String DOTNET_CLASSNAME = "MyConnectorNamespace.MySourceConnector, MySourceConnectorAssembly";
public static final ConfigDef CONFIG_DEF = new ConfigDef(KNetSourceConnector.CONFIG_DEF); // add here specific configuration properties
public MySourceConnector() {
super();
}
@Override
public String version() {
return VERSION;
}
@Override
public Class<? extends Task> taskClass() {
return MySourceTask.class;
}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
public String getClassName() {
return DOTNET_CLASSNAME;
}
}
This behavior helps to define a fixed value that depends on the implementation and avoid possible errors in the declaration of the configuration files. The connector shall defines only the Java class, like it is done for every other type of available connector:
connector.class=myconnectorpackage.MySourceConnector
General
To start a Connect session the user shall use the KNet Connect or the information available at https://kafka.apache.org/documentation/#connect and https://kafka.apache.org/quickstart#quickstart_kafkaconnect.
The commands related to Apache Kafka™ Connect are:
- ConnectDistributed: starts a distributed session
- ConnectStandalone: starts a standalone session
To go in detail look at https://kafka.apache.org/documentation/#connect and https://kafka.apache.org/quickstart#quickstart_kafkaconnect.
Standalone
In this guide we focus on the standalone version. The guide start from the assumption that an assembly was generated: see Template Usage Guide. Put the assembly within a folder (C:\MyConnect), then go within it. As explained in https://kafka.apache.org/documentation/#connect Apache Kafka™ Connect needs at least one configuration file, in standalone mode it needs two configuration files:
- The first file is connect-standalone.properties (or connect-distributed.properties for distributed environments): this file contains configuration information for Apache Kafka™ Connect;
- The second file (optional for distributed version) defines the connector to use and its options.
In the config folder the user can found many configuration files. The files named connect-knet-sink.properties and connect-knet-source.properties contain examples for sink and source connectors.
Copy within C:\MyConnect connect-standalone.properties and update it especially within the line containing bootstrap.servers; then copy connect-knet-sink.properties or connect-knet-source.properties depending on the connector type and update using the information available in Configuration properties.
Execution
When the C:\MyConnect folder contains all the files it is possible to run Apache Kafka™ Connect:
- .NET hosted runtime:
knetconnect -s connect-standalone.properties connect-knet-sink.properties
- JVM hosted runtime:
connect-standalone.sh connect-standalone.properties connect-knet-sink.properties
Distributed
As stated in Apache Kafka™ Connect Guide the distributed version does not use the connector file definition, instead it shall be managed using the REST interface. The start-up command within C:\MyConnect folder becomes:
- .NET hosted runtime
knetconnect -d connect-distributed.properties
- JVM hosted runtime
connect-distributed.sh connect-distributed.properties
Configuration properties
Each connector needs some configuration properties. Some common configuration properties, inherited from Apache Kafka™ Connect, are:
- name=name of connector where the name of connector is the connector name
- connector.class=value where the value is the connector Java™ class and must be:
- KNetSinkConnector for sink connectors
- KNetSourceConnector for source connectors
- tasks.max=value where the value represents the maximum number of tasks the connector can allocate
others can be found in https://kafka.apache.org/41/configuration/kafka-connect-configs.
The mandatory configuration property needed by KNet Connect SDK is:
- knet.dotnet.classname=value where the value is the .NET class name in the form of FullName, AssemblyName, e.g. MASES.KNet.Template.KNetConnect.KNetConnectSink, knetConnectSink
When the connector is based on a JVM hosted runtime other optional properties are available:
- knet.dotnet.assembly.location=value where the value represents the location where to find the connector assembly containing the class in knet.dotnet.classname=value
Note
Starting from KNet 3.1.2, knet.dotnet.classname=value is no more mandatory and can be set at code level like knet.dotnet.assembly.location=value: see Programmatically properties overrides
Source connector
A source connector needs other configuration properties inherited from Apache Kafka™ Connect like the following.
Exactly Once and Transaction properties for Source Connector
From version 3.3.1 of Apache Kafka™ Connect it is possible to manage Exactly Once and Transaction in the source connector.
Two new fallback options are available in case the infrastructure is not ready to receive request in .NET side to obtain values related to Exactly Once and Transaction:
- knet.dotnet.source.exactlyOnceSupport=value: set value to true if .NET Source Connector supports Exactly Once
- knet.dotnet.source.canDefineTransactionBoundaries=value: set value to true if .NET Source Connector can define Transaction Boundaries
Sink connector
A sink connector needs other configuration properties inherited from Apache Kafka™ Connect like:
- topics=value where the value represents the CSV list of the topics will be the source of the records
Enable advance logging
KNet Connect SDK supports avdvance logging of JCOBridge low level information, however the interface IJCEventLog was designed for advance logging in debug scenarios:
void FusionLog(String msg): invoked when a fusion event occurs and the developer shall be informedvoid EventLog(String msg): invoked on each low level event occurs to understand the low level flow
From KNet version 3.1.2 the registration for this kind of logs is disabled by default to avoid useless overload. The registration shall be enabled explicitly in some kind of scenarios with this procedure:
- define an environment variable named
KNetConnectEnableJCOBridgeLogging: when the connect runtime restart the proxy will register to receive log notifications - use the DEBUG level in trace configuration: the implementation of IJCEventLog writes the log at debug level so the user can reduce their verbosity managing the logging configuration