Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending cloudevents format messages using grpc is slow (I don't know if all producers are slow) #5030

Open
1 task done
LeeMoonCh opened this issue Jul 12, 2024 · 3 comments
Labels
question Further information is requested

Comments

@LeeMoonCh
Copy link

LeeMoonCh commented Jul 12, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

Question

Sending cloudevents format messages with grpc is slow (I'm not sure if the producers are all slow), and sending 101 pieces of data takes over 60 seconds.

code like this

            String gPort = ConfigUtil.getProperty("em.grpc.port");
            String host = ConfigUtil.getProperty("em.host");
            String pwd = ConfigUtil.getProperty("em.pwd");
            String user = ConfigUtil.getProperty("em.user");

            EventMeshGrpcClientConfig config = EventMeshGrpcClientConfig.builder()
                .serverAddr("172.16.15.136")
                .serverPort(51112)
                .consumerGroup(PRODUCER_GROUP)
                .password(pwd)
                .userName(user)
                .env(ENV)
                .idc(IDC)
                .sys(SYSID)
                .build();

            EventMeshGrpcProducer producer = new EventMeshGrpcProducer(config);

....

                CloudEvent event = CloudEventBuilder.v1()
                    .withId(UUID.randomUUID().toString())
                    .withSubject("tc_event_device") //topic
                    .withSource(URI.create("/"))
                    .withDataContentType(CLOUDEVENT_CONTENT_TYPE)
                    .withType(CLOUD_EVENTS_PROTOCOL_NAME)
                    .withData(eventJson.toJSONString().getBytes(StandardCharsets.UTF_8))
                    .withExtension("ttl", String.valueOf(4 * 1000))
                    .build();

                list.add(event);
              // send data
                if(list.size() > 100){
                    sendMsg2EventMesh(producer);
                }


......

private void sendMsg2EventMesh(EventMeshGrpcProducer producer){
            System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
            logger.info("发送事件个数:" + list.size());
            long start = System.currentTimeMillis();
            System.out.println("处理完数据了:" + start + " 用时:" + (start - begin));
            CloudEvent cloudEvent = list.get(list.size() - 1);
            String cs = JSON.parseObject(new String(cloudEvent.getData().toBytes())).getString("customCode");
            producer.publish(list);
            System.out.println("发送成功!" + cs + " 用时:" + (System.currentTimeMillis() - start) + " 发送条数:" + list.size());
            list.clear();
            begin = System.currentTimeMillis();
    }

The results as shown:
image

The server is configured as follows:

eventmesh.properties

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################EVNETMESH-runtime ENV#################################
eventMesh.server.idc=idc
eventMesh.server.env=env
eventMesh.server.provide.protocols=HTTP,TCP,GRPC
eventMesh.server.cluster=COMMON
eventMesh.server.name=EVENTMESH-runtime
eventMesh.sysid=1234
eventMesh.server.http.port=51113
eventMesh.server.grpc.port=51112
########################## eventMesh tcp configuration ############################
eventMesh.server.tcp.enabled=true
eventMesh.server.tcp.port=51111
eventMesh.server.tcp.readerIdleSeconds=120
eventMesh.server.tcp.writerIdleSeconds=120
eventMesh.server.tcp.allIdleSeconds=120
eventMesh.server.tcp.clientMaxNum=10000
# client isolation time if the message send failure
eventMesh.server.tcp.pushFailIsolateTimeInMills=30000
# rebalance internal
eventMesh.server.tcp.RebalanceIntervalInMills=30000
# session expire time about client
eventMesh.server.session.expiredInMills=60000
# flow control, include the global level and session level
eventMesh.server.tcp.msgReqnumPerSecond=15000
eventMesh.server.http.msgReqnumPerSecond=15000
eventMesh.server.session.upstreamBufferSize=20

# for single event publish, maximum size allowed per event
eventMesh.server.maxEventSize=1048576
# for batch event publish, maximum number of events allowed in one batch
eventMesh.server.maxEventBatchSize=1000

# thread number about global scheduler
eventMesh.server.global.scheduler=5
eventMesh.server.tcp.taskHandleExecutorPoolSize=8
#retry
eventMesh.server.retry.async.pushRetryTimes=3
eventMesh.server.retry.sync.pushRetryTimes=3
eventMesh.server.retry.async.pushRetryDelayInMills=500
eventMesh.server.retry.sync.pushRetryDelayInMills=500
eventMesh.server.retry.pushRetryQueueSize=10000
#admin
eventMesh.server.admin.http.port=10106
#registry
eventMesh.server.registry.registerIntervalInMills=10000
eventMesh.server.registry.fetchRegistryAddrIntervalInMills=20000
#auto-ack
#eventMesh.server.defibus.client.comsumeTimeoutInMin=5

#sleep interval between closing client of different group in server graceful shutdown
eventMesh.server.gracefulShutdown.sleepIntervalInMills=1000
eventMesh.server.rebalanceRedirect.sleepIntervalInMills=200

#ip address blacklist
eventMesh.server.blacklist.ipv4=0.0.0.0/8,127.0.0.0/8,169.254.0.0/16,255.255.255.255/32
eventMesh.server.blacklist.ipv6=::/128,::1/128,ff00::/8

#connector plugin
eventMesh.connector.plugin.type=rocketmq

#storage plugin
eventMesh.storage.plugin.type=rocketmq

#security plugin
#eventMesh.server.security.enabled=false
#eventMesh.security.plugin.type=security
#eventMesh.security.validation.type.token=false
#eventMesh.security.publickey=

#registry plugin
eventMesh.registry.plugin.enabled=false
eventMesh.registry.plugin.type=nacos
eventMesh.registry.plugin.server-addr=127.0.0.1:8848
eventMesh.registry.plugin.username=nacos
eventMesh.registry.plugin.password=nacos

# The TLS configuration of registry plugin consul
# keyStoreInstanceType's value can refer to com.ecwid.consul.transport.TLSConfig.KeyStoreInstanceType
#eventMesh.registry.consul.tls.keyStoreInstanceType=
#eventMesh.registry.consul.tls.certificatePath=
#eventMesh.registry.consul.tls.certificatePassword=
#eventMesh.registry.consul.tls.keyStorePath=
#eventMesh.registry.consul.tls.keyStorePassword=

# metrics plugin, if you have multiple plugin, you can use ',' to split
#eventMesh.metrics.plugin=prometheus

# trace plugin
eventMesh.server.trace.enabled=false
eventMesh.trace.plugin=zipkin

# webhook
# Start webhook admin service
eventMesh.webHook.admin.start=true
# Webhook event configuration storage mode. Currently, only file and Nacos are supported
eventMesh.webHook.operationMode=file
# The file storage path of the file storage mode. If #{eventmeshhome} is written, it is in the eventmesh root directory
eventMesh.webHook.fileMode.filePath= #{eventMeshHome}/webhook
# Nacos storage mode, and the configuration naming rule is eventmesh webHook. nacosMode. {nacos native configuration key} please see the specific configuration [nacos github api](https://github.com/alibaba/nacos/blob/develop/api/src/main/java/com/alibaba/nacos/api/SystemPropertyKeyConst.java)
## Address of Nacos
eventMesh.webHook.nacosMode.serverAddr=0.0.0.0:8848
# Webhook eventcloud sending mode. And eventmesh connector. plugin. The type configuration is the same
eventMesh.webHook.producer.storage=standalone

eventMesh.server.flushDiskType=ASYNC_FLUSH

server.env

APP_START_JVM_OPTION:::-server -Xms1g -Xmx16g -Xmn4g  -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:ConcGCThreads=8 -XX:ParallelGCThreads=8 -XX:MaxDirectMemorySize=16G -Dio.netty.eventLoopThreads=32 -Dio.netty.maxDirectMemory=16G -XX:SurvivorRatio=4 -Duser.language=zh

Where I do wrong?
I don't think producers should so slowly!

@LeeMoonCh LeeMoonCh added the question Further information is requested label Jul 12, 2024
Copy link
Contributor

Welcome to the Apache EventMesh community!!
We are glad that you are contributing by opening this issue. :D

Please make sure to include all the relevant context.
We will be here shortly.

If you are interested in contributing to our project, please let us know!
You can check out our contributing guide on contributing to EventMesh.

Want to get closer to the community?

WeChat Assistant WeChat Public Account Slack
Join Slack Chat

Mailing Lists:

Name Description Subscribe Unsubscribe Archive
Users User support and questions mailing list Subscribe Unsubscribe Mail Archives
Development Development related discussions Subscribe Unsubscribe Mail Archives
Commits All commits to repositories Subscribe Unsubscribe Mail Archives
Issues Issues or PRs comments and reviews Subscribe Unsubscribe Mail Archives
@LeeMoonCh
Copy link
Author

2024-07-12 22:31:33,260 WARN  [eventMesh-tcpNettyEpoll-Boss-1] ServerBootstrap(AbstractBootstrap.java:464) - Unknown channel option 'SO_TIMEOUT' for channel '[id: 0xb8d69f6f, L:/172.16.15.136:51111 - R:/10.2.1.116:51565]'
2024-07-12 22:31:33,263 INFO  [eventMesh-tcp-worker-6] EventMeshTcpConnectionHandler(EventMeshTcpConnectionHandler.java:48) - client|tcp|channelRegistered|remoteAddress=10.2.1.116:51565|msg=
2024-07-12 22:31:33,263 INFO  [eventMesh-tcp-worker-6] EventMeshTcpConnectionHandler(EventMeshTcpConnectionHandler.java:62) - client|tcp|channelActive|remoteAddress=10.2.1.116:51565|msg=
2024-07-12 22:31:33,692 INFO  [eventMesh-tcp-worker-6] message(EventMeshTcpMessageDispatcher.java:100) - pkg|c2eventMesh|cmd=HELLO_REQUEST|pkg=org.apache.eventmesh.common.protocol.tcp.Package@63c8d488
2024-07-12 22:31:33,693 ERROR [eventMesh-tcp-task-handle-6] message(HelloTask.java:95) - HelloTask failed|address=/10.2.1.116:51565,errMsg=java.lang.Exception: client purpose config is error
2024-07-12 22:31:33,695 INFO  [eventMesh-tcp-worker-6] message(Utils.java:128) - pkg|eventMesh2c|cmd=HELLO_RESPONSE|pkg=org.apache.eventmesh.common.protocol.tcp.Package@5e6d5da9|user=UserAgent{env='env', subsystem='5556', group='EventmeshTestGroup', path='C:/em', pid=32893, host='10.2.1.116', port=8362, version='2.0.11', idc='1234', purpose='null', unack='0'}|wait=0ms|cost=2ms

In addition, when I want to try to send a message using tpc, there is an error prompt

My code :

 UserAgent userAgent = UserAgent.builder()
               .env(ENV)
               .host("10.2.1.116")
               .password(generateRandomString(UtilsConstants.PASSWORD_LENGTH))
               .username("admin")
               .group(UtilsConstants.GROUP)
               .path("C:/em")
               .port(8362)
               .subsystem("5556")
               .pid(32_893)
               .version(UtilsConstants.VERSION)
               .idc(SYSID)
               .build();
           EventMeshTCPClientConfig eventMeshTcpClientConfig = EventMeshTCPClientConfig.builder()
               .host("172.16.15.136")
               .port(51111)
               .userAgent(userAgent)
               .build();
           final EventMeshTCPClient<CloudEvent> producer =
               EventMeshTCPClientFactory.createEventMeshTCPClient(eventMeshTcpClientConfig, CloudEvent.class);
           producer.init();
           ```
So, now I am confused. Neither official documents nor cases provide detailed descriptions of UserAgents. It's weird.
@xwm1992
Copy link
Contributor

xwm1992 commented Jul 24, 2024

client purpose config is error

this error means your UserAgents missing the purpose property, you can take a look at EventMeshTestUtils.generateClient1() method, this may help you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
2 participants