ページの先頭行へ戻る
Interstage Big DataComplex Event Processing Server V1.1.0 開発リファレンス
FUJITSU Software

3.5.3 Socketアダプター

サンプルプログラムのソースコード:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

62
63
64
65

66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;

public class SocketClient {
Socket s = new Socket();

public static void main(String[] args) {
String hostName = "";
int port = 0;
String dataType = "";
String charSet = "";
String eventTypeId = "";
String data = "";
long lWait = 10;
int loop = 0;
int dataCount = 1;

if (args.length != 9) {
System.out.println("param is Abnormal");
return;
}

try {
hostName = String.valueOf(args[0]);
port = Integer.valueOf(args[1]);
dataType = String.valueOf(args[2]);
charSet = String.valueOf(args[3]);
eventTypeId = String.valueOf(args[4]);
data = String.valueOf(args[5]);
lWait = Long.valueOf(args[6]);
loop = Integer.valueOf(args[7]);
dataCount = Integer.valueOf(args[8]);

SocketClient c = new SocketClient(hostName, port);
c.sendMessage(dataCount, dataType, charSet, eventTypeId, data, loop, lWait);

System.out.println(c.readResponse());

} catch (InterruptedException e) {
e.printStackTrace();
return;
} catch (Exception e) {
e.printStackTrace();
return;
}
}

public void sendMessage(int dataCount, String dataType, String charSet, String eventTypeId,
String data, int loop, long wait) throws IOException, InterruptedException {

DataOutputStream dos = new DataOutputStream(
new BufferedOutputStream(s.getOutputStream()));

for (int k = 0; k < loop; k++) {

for (int i = 0; i < dataCount; i++) {
int count = ( k * dataCount ) + i;
String msg = data.replaceAll("%COUNTER%", String.valueOf(count));

int length = dataType.getBytes().length + 4 + eventTypeId.getBytes().length
+ 4 + charSet.getBytes().length + 4 + msg.getBytes().length;
dos.writeInt(length);

dos.write(dataType.getBytes());

dos.writeInt(eventTypeId.getBytes().length);
dos.write(eventTypeId.getBytes());

dos.writeInt(charSet.getBytes().length);
dos.write(charSet.getBytes());

dos.writeInt(msg.getBytes().length);
dos.write(msg.getBytes(charSet));

dos.flush();
}

Thread.sleep(wait);
}
dos.writeInt(0);
dos.flush();
}

public SocketClient(String host, int port) throws IOException {
InetSocketAddress address = new InetSocketAddress(InetAddress.getByName(host),
Integer.valueOf(port));
s.setSendBufferSize(1000000000);
s.connect(address);
}

public String readResponse() throws IOException {
BufferedReader br = new BufferedReader( new InputStreamReader(s.getInputStream()));

String ret =br.readLine();
System.out.println("RESPONSE:" + ret);
return "";
}
}

ソースコードについて解説します。

3.5.3.1 サンプル実行例(CSVデータを送信する場合)

サンプルの実行例を以下に記述します。

なお、例ではDebugLogListenerを使用し、エンジンログにデバッグ情報を出力しています。

コマンド実行結果

# java -cp ./ SocketClient localhost 8001 CSV UTF-8 CSVEvent SOCKET,CSV,%COUNTER% 1 2 2 <ENTER>
RESPONSE:0000:6:Sending message completed normally.

エンジンログ出力結果

2012-07-29 13:27:49,410 [DEBUG] abc:length=1
abc[0]
operation :CSV: String
count :0: String
ID :SOCKET: String

2012-07-29 13:27:49,422 [DEBUG] abc:length=1
abc[0]
operation :CSV: String
count :1: String
ID :SOCKET: String

2012-07-29 13:27:49,427 [DEBUG] abc:length=1
abc[0]
operation :CSV: String
count :2: String
ID :SOCKET: String

2012-07-29 13:27:49,428 [DEBUG] abc:length=1
abc[0]
operation :CSV: String
count :3: String
ID :SOCKET: String

3.5.3.2 サンプル実行例(XMLデータを送信する場合)

サンプルの実行例を以下に記述します。

なお、例ではDebugLogListenerを使用し、エンジンログにデバッグ情報を出力しています。

コマンド実行結果

# java -cp ./ SocketClient localhost 8001 XML UTF-8 XMLEvent '<?xml version="1.0" encoding="UTF-8"?>\
<XMLEvent><ID>SOCKET</ID> <operation>XML</operation> <count>%COUNTER%</count></XMLEvent>' 1 2 2 <ENTER>
RESPONSE:0000:4:Sending message completed normally.

(注) 見やすさのため、上記の例では入力コマンドの途中で「\」で改行しています。実際には「\」は入力せず、次の行の内容を続けて入力します。

エンジンログ出力結果

2012-07-29 13:30:56,861 [DEBUG] abc--0:length=1
abc--0[0]

operation :XML: String
count :0: String
ID :SOCKET: String

2012-07-29 13:30:56,862 [DEBUG] abc--0:length=1
abc--0[0]
operation :XML: String
count :1: String
ID :SOCKET: String

2012-07-29 13:30:56,865 [DEBUG] abc--0:length=1
abc--0[0]
operation :XML: String
count :2: String
ID :SOCKET: String

2012-07-29 13:30:56,865 [DEBUG] abc--0:length=1
abc--0[0]
operation :XML: String
count :3: String
ID :SOCKET: String